Skip to content

Commit fd5ee73

Browse files
authored
feat(s2n-quic-dc): reduce socket addr calls even more (#2406)
1 parent f8f2361 commit fd5ee73

File tree

3 files changed

+52
-13
lines changed

3 files changed

+52
-13
lines changed

dc/s2n-quic-dc/src/stream/client/tokio.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,18 @@ where
6565
// Make sure TCP_NODELAY is set
6666
let _ = socket.set_nodelay(true);
6767

68-
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(socket), subscriber, None)?;
68+
let local_port = socket.local_addr()?.port();
69+
let stream = endpoint::open_stream(
70+
env,
71+
peer,
72+
env::TcpRegistered {
73+
socket,
74+
peer_addr: acceptor_addr.into(),
75+
local_port,
76+
},
77+
subscriber,
78+
None,
79+
)?;
6980

7081
// build the stream inside the application context
7182
let mut stream = stream.connect()?;
@@ -85,14 +96,26 @@ where
8596
#[inline]
8697
pub async fn connect_tcp_with<Sub>(
8798
peer: secret::map::Peer,
88-
stream: TcpStream,
99+
socket: TcpStream,
89100
env: &Environment<Sub>,
90101
subscriber: Sub,
91102
) -> io::Result<Stream<Sub>>
92103
where
93104
Sub: event::Subscriber,
94105
{
95-
let stream = endpoint::open_stream(env, peer, env::TcpRegistered(stream), subscriber, None)?;
106+
let local_port = socket.local_addr()?.port();
107+
let peer_addr = socket.peer_addr()?.into();
108+
let stream = endpoint::open_stream(
109+
env,
110+
peer,
111+
env::TcpRegistered {
112+
socket,
113+
peer_addr,
114+
local_port,
115+
},
116+
subscriber,
117+
None,
118+
)?;
96119

97120
// build the stream inside the application context
98121
let mut stream = stream.connect()?;

dc/s2n-quic-dc/src/stream/environment/tokio.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,11 @@ where
255255
}
256256

257257
/// A socket that is already registered with the application runtime
258-
pub struct TcpRegistered(pub TcpStream);
258+
pub struct TcpRegistered {
259+
pub socket: TcpStream,
260+
pub peer_addr: SocketAddress,
261+
pub local_port: u16,
262+
}
259263

260264
impl<Sub> super::Peer<Environment<Sub>> for TcpRegistered
261265
where
@@ -274,9 +278,9 @@ where
274278

275279
#[inline]
276280
fn setup(self, _env: &Environment<Sub>) -> super::Result<super::SocketSet<Self::WorkerSocket>> {
277-
let remote_addr = self.0.peer_addr()?.into();
278-
let source_control_port = self.0.local_addr()?.port();
279-
let application = Box::new(self.0);
281+
let remote_addr = self.peer_addr;
282+
let source_control_port = self.local_port;
283+
let application = Box::new(self.socket);
280284
Ok(super::SocketSet {
281285
application,
282286
read_worker: None,
@@ -289,7 +293,11 @@ where
289293
}
290294

291295
/// A socket that should be reregistered with the application runtime
292-
pub struct TcpReregistered(pub TcpStream, pub SocketAddress);
296+
pub struct TcpReregistered {
297+
pub socket: TcpStream,
298+
pub peer_addr: SocketAddress,
299+
pub local_port: u16,
300+
}
293301

294302
impl<Sub> super::Peer<Environment<Sub>> for TcpReregistered
295303
where
@@ -308,9 +316,9 @@ where
308316

309317
#[inline]
310318
fn setup(self, _env: &Environment<Sub>) -> super::Result<super::SocketSet<Self::WorkerSocket>> {
311-
let remote_addr = self.1;
312-
let source_control_port = self.0.local_addr()?.port();
313-
let application = Box::new(self.0.into_std()?);
319+
let source_control_port = self.local_port;
320+
let remote_addr = self.peer_addr;
321+
let application = Box::new(self.socket.into_std()?);
314322
Ok(super::SocketSet {
315323
application,
316324
read_worker: None,

dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ impl FreshQueue {
225225
remaining -= 1;
226226

227227
if remaining == 0 {
228+
// if we're yielding then we need to wake ourselves up again
229+
cx.waker().wake_by_ref();
228230
break;
229231
}
230232
}
@@ -444,6 +446,7 @@ where
444446
secrets: secret::Map,
445447
accept_flavor: accept::Flavor,
446448
subscriber: Sub,
449+
local_port: u16,
447450
}
448451

449452
impl<Sub> WorkerContext<Sub>
@@ -458,6 +461,7 @@ where
458461
secrets: acceptor.secrets.clone(),
459462
accept_flavor: acceptor.accept_flavor,
460463
subscriber: acceptor.subscriber.clone(),
464+
local_port: acceptor.socket.local_addr().unwrap().port(),
461465
}
462466
}
463467
}
@@ -691,7 +695,11 @@ impl WorkerState {
691695
let stream_builder = match endpoint::accept_stream(
692696
now,
693697
&context.env,
694-
env::TcpReregistered(socket, remote_address),
698+
env::TcpReregistered {
699+
socket,
700+
peer_addr: remote_address,
701+
local_port: context.local_port,
702+
},
695703
&initial_packet,
696704
None,
697705
Some(recv_buffer),
@@ -702,7 +710,7 @@ impl WorkerState {
702710
) {
703711
Ok(stream) => stream,
704712
Err(error) => {
705-
if let Some(env::TcpReregistered(socket, remote_address)) = error.peer {
713+
if let Some(env::TcpReregistered { socket, .. }) = error.peer {
706714
if !error.secret_control.is_empty() {
707715
// if we need to send an error then update the state and loop back
708716
// around

0 commit comments

Comments
 (0)