Skip to content

Commit 70e8de8

Browse files
authored
feat: add stream recv buffer trait and impls (#2505)
1 parent 4d60027 commit 70e8de8

File tree

13 files changed

+627
-378
lines changed

13 files changed

+627
-378
lines changed

dc/s2n-quic-dc/src/stream/client/tokio.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use crate::{
5-
event,
5+
event, msg,
66
path::secret,
77
stream::{
88
application::Stream,
99
endpoint,
1010
environment::tokio::{self as env, Environment},
11+
recv,
1112
socket::Protocol,
1213
},
1314
};
@@ -33,6 +34,7 @@ where
3334
env,
3435
peer,
3536
env::UdpUnbound(acceptor_addr.into()),
37+
recv_buffer(),
3638
subscriber,
3739
None,
3840
)?;
@@ -87,6 +89,7 @@ where
8789
peer_addr,
8890
local_port,
8991
},
92+
recv_buffer(),
9093
subscriber,
9194
None,
9295
)?;
@@ -126,6 +129,7 @@ where
126129
peer_addr,
127130
local_port,
128131
},
132+
recv_buffer(),
129133
subscriber,
130134
None,
131135
)?;
@@ -153,3 +157,9 @@ where
153157
.await
154158
.map(|_| ())
155159
}
160+
161+
#[inline]
162+
fn recv_buffer() -> recv::shared::RecvBuffer {
163+
// TODO replace this with a parameter once everything is in place
164+
recv::buffer::Local::new(msg::recv::Message::new(9000), None)
165+
}

dc/s2n-quic-dc/src/stream/endpoint.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use crate::{
55
event::{self, api::Subscriber as _, IntoEvent as _},
6-
msg, packet,
6+
packet,
77
path::secret::{self, map, Map},
88
random::Random,
99
stream::{
@@ -37,6 +37,7 @@ pub fn open_stream<Env, P>(
3737
env: &Env,
3838
entry: map::Peer,
3939
peer: P,
40+
recv_buffer: recv::shared::RecvBuffer,
4041
subscriber: Env::Subscriber,
4142
parameter_override: Option<&dyn Fn(dc::ApplicationParams) -> dc::ApplicationParams>,
4243
) -> Result<application::Builder<Env::Subscriber>>
@@ -76,8 +77,7 @@ where
7677
crypto,
7778
entry.map(),
7879
parameters,
79-
None,
80-
None,
80+
recv_buffer,
8181
endpoint::Type::Client,
8282
subscriber,
8383
subscriber_ctx,
@@ -90,8 +90,7 @@ pub fn accept_stream<Env, P>(
9090
env: &Env,
9191
mut peer: P,
9292
packet: &server::InitialPacket,
93-
handshake: Option<server::handshake::Receiver>,
94-
buffer: Option<&mut msg::recv::Message>,
93+
recv_buffer: recv::shared::RecvBuffer,
9594
map: &Map,
9695
subscriber: Env::Subscriber,
9796
subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
@@ -134,8 +133,7 @@ where
134133
crypto,
135134
map,
136135
parameters,
137-
handshake,
138-
buffer,
136+
recv_buffer,
139137
endpoint::Type::Server,
140138
subscriber,
141139
subscriber_ctx,
@@ -164,8 +162,7 @@ fn build_stream<Env, P>(
164162
crypto: secret::map::Bidirectional,
165163
map: &Map,
166164
parameters: dc::ApplicationParams,
167-
handshake: Option<server::handshake::Receiver>,
168-
recv_buffer: Option<&mut msg::recv::Message>,
165+
recv_buffer: recv::shared::RecvBuffer,
169166
endpoint_type: endpoint::Type,
170167
subscriber: Env::Subscriber,
171168
subscriber_ctx: <Env::Subscriber as event::Subscriber>::ConnectionContext,
@@ -179,7 +176,7 @@ where
179176
let sockets = peer.setup(env)?;
180177

181178
// construct shared reader state
182-
let reader = recv::shared::State::new(stream_id, &parameters, handshake, features, recv_buffer);
179+
let reader = recv::shared::State::new(stream_id, &parameters, features, recv_buffer);
183180

184181
let writer = {
185182
let worker = sockets

dc/s2n-quic-dc/src/stream/recv.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
mod ack;
55
pub mod application;
6+
pub(crate) mod buffer;
67
mod error;
78
mod packet;
89
mod probes;

dc/s2n-quic-dc/src/stream/recv/application.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ where
203203

204204
let shared = &self.shared;
205205
let sockets = &self.sockets;
206-
let transport_features = sockets.read_application().features();
206+
let transport_features = sockets.features();
207207

208208
let mut reader = shared.receiver.application_guard(
209209
self.ack_mode,
@@ -263,28 +263,25 @@ where
263263
_ => {}
264264
}
265265

266-
let before_len = reader.recv_buffer.payload_len();
267-
268266
let recv = reader.poll_fill_recv_buffer(
269267
cx,
270268
self.sockets.read_application(),
271269
&self.shared.clock,
272270
&self.shared.subscriber,
273271
);
274272

275-
match Self::handle_socket_result(cx, &mut reader.receiver, &mut self.timer, recv) {
276-
Poll::Ready(res) => res?,
277-
// if we've written at least one byte then return that amount
278-
Poll::Pending if out_buf.written_len() > 0 => break,
279-
Poll::Pending => return Poll::Pending,
280-
}
273+
let recv_len =
274+
match Self::handle_socket_result(cx, &mut reader.receiver, &mut self.timer, recv) {
275+
Poll::Ready(res) => res?,
276+
// if we've written at least one byte then return that amount
277+
Poll::Pending if out_buf.written_len() > 0 => break,
278+
Poll::Pending => return Poll::Pending,
279+
};
281280

282281
// clear the forced receive after performing it once
283282
force_recv = false;
284283

285-
let after_len = reader.recv_buffer.payload_len();
286-
287-
if before_len == after_len {
284+
if recv_len == 0 {
288285
if transport_features.is_stream() {
289286
// if we got a 0-length read then the stream was closed - notify the receiver
290287
reader.receiver.on_transport_close();
@@ -303,8 +300,8 @@ where
303300
cx: &mut Context,
304301
receiver: &mut recv::state::State,
305302
timer: &mut Option<Timer>,
306-
res: Poll<io::Result<()>>,
307-
) -> Poll<io::Result<()>> {
303+
res: Poll<io::Result<usize>>,
304+
) -> Poll<io::Result<usize>> {
308305
if let Poll::Ready(res) = res {
309306
return res.into();
310307
}
@@ -320,7 +317,8 @@ where
320317
ready!(timer.poll_ready(cx));
321318

322319
// if the timer expired then keep going, even if the recv buffer is empty
323-
Ok(()).into()
320+
// we return `1` to make the caller think that something was written to the buffer
321+
Ok(1).into()
324322
} else {
325323
timer.cancel();
326324
Poll::Pending

dc/s2n-quic-dc/src/stream/recv/application/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ where
3636

3737
let remote_addr = shared.read_remote_addr();
3838
// we only need a timer for unreliable transports
39-
let is_reliable = sockets.read_application().features().is_reliable();
39+
let is_reliable = sockets.features().is_reliable();
4040
let timer = if is_reliable {
4141
None
4242
} else {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{
5+
event,
6+
stream::{recv, socket::Socket, TransportFeatures},
7+
};
8+
use core::task::{Context, Poll};
9+
use std::io;
10+
11+
mod dispatch;
12+
mod local;
13+
14+
pub use dispatch::Dispatch;
15+
pub use local::Local;
16+
17+
pub trait Buffer {
18+
fn is_empty(&self) -> bool;
19+
20+
fn poll_fill<S, Pub>(
21+
&mut self,
22+
cx: &mut Context,
23+
socket: &S,
24+
publisher: &mut Pub,
25+
) -> Poll<io::Result<usize>>
26+
where
27+
S: ?Sized + Socket,
28+
Pub: event::ConnectionPublisher;
29+
30+
fn process<R>(
31+
&mut self,
32+
features: TransportFeatures,
33+
router: &mut R,
34+
) -> Result<(), recv::Error>
35+
where
36+
R: Dispatch;
37+
}
38+
39+
#[allow(dead_code)] // TODO remove this once we start using the channel buffer
40+
pub enum Either<A, B> {
41+
A(A),
42+
B(B),
43+
}
44+
45+
impl<A, B> Buffer for Either<A, B>
46+
where
47+
A: Buffer,
48+
B: Buffer,
49+
{
50+
#[inline]
51+
fn is_empty(&self) -> bool {
52+
match self {
53+
Self::A(a) => a.is_empty(),
54+
Self::B(b) => b.is_empty(),
55+
}
56+
}
57+
58+
#[inline]
59+
fn poll_fill<S, Pub>(
60+
&mut self,
61+
cx: &mut Context,
62+
socket: &S,
63+
publisher: &mut Pub,
64+
) -> Poll<io::Result<usize>>
65+
where
66+
S: ?Sized + Socket,
67+
Pub: event::ConnectionPublisher,
68+
{
69+
match self {
70+
Self::A(a) => a.poll_fill(cx, socket, publisher),
71+
Self::B(b) => b.poll_fill(cx, socket, publisher),
72+
}
73+
}
74+
75+
#[inline]
76+
fn process<R>(&mut self, features: TransportFeatures, router: &mut R) -> Result<(), recv::Error>
77+
where
78+
R: Dispatch,
79+
{
80+
match self {
81+
Self::A(a) => a.process(features, router),
82+
Self::B(b) => b.process(features, router),
83+
}
84+
}
85+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{packet, stream::recv};
5+
use s2n_codec::DecoderBufferMut;
6+
use s2n_quic_core::inet::{ExplicitCongestionNotification, SocketAddress};
7+
8+
pub trait Dispatch {
9+
#[inline(always)]
10+
fn tag_len(&self) -> usize {
11+
16
12+
}
13+
14+
fn on_packet(
15+
&mut self,
16+
remote_addr: &SocketAddress,
17+
ecn: ExplicitCongestionNotification,
18+
packet: packet::Packet,
19+
) -> Result<(), recv::Error>;
20+
21+
#[inline]
22+
fn on_datagram_segment(
23+
&mut self,
24+
remote_addr: &SocketAddress,
25+
ecn: ExplicitCongestionNotification,
26+
segment: &mut [u8],
27+
) -> Result<(), recv::Error> {
28+
let tag_len = self.tag_len();
29+
let segment_len = segment.len();
30+
let mut decoder = DecoderBufferMut::new(segment);
31+
32+
while !decoder.is_empty() {
33+
let packet = match decoder.decode_parameterized(tag_len) {
34+
Ok((packet, remaining)) => {
35+
decoder = remaining;
36+
packet
37+
}
38+
Err(decoder_error) => {
39+
// the packet was likely corrupted so log it and move on to the
40+
// next segment
41+
tracing::warn!(
42+
%decoder_error,
43+
segment_len
44+
);
45+
46+
break;
47+
}
48+
};
49+
50+
self.on_packet(remote_addr, ecn, packet)?;
51+
}
52+
53+
Ok(())
54+
}
55+
}

0 commit comments

Comments
 (0)