Skip to content

Commit d0aff82

Browse files
authored
fix(s2n-quic-dc): convert stream segments to probes when available (#2575)
1 parent 6a81117 commit d0aff82

File tree

4 files changed

+151
-54
lines changed

4 files changed

+151
-54
lines changed

dc/s2n-quic-dc/src/recovery.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,11 @@
44
pub use s2n_quic_core::recovery::RttEstimator;
55

66
pub fn rtt_estimator() -> RttEstimator {
7-
RttEstimator::new(core::time::Duration::from_millis(10))
7+
// Set the initial RTT to 2ms so we send a probe after ~6ms
8+
//
9+
// TODO longer term, it might be a good idea to have the handshake map
10+
// entry maintain a recent RTT, or at least use the value from the original
11+
// handshake. This default value is going to be difficult to get right
12+
// for every environment.
13+
RttEstimator::new(core::time::Duration::from_millis(2))
814
}

dc/s2n-quic-dc/src/stream/send/state.rs

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -368,15 +368,9 @@ impl State {
368368

369369
// if there was no error and we transmitted everything then just shut the
370370
// stream down
371-
if close.error_code == VarInt::ZERO
372-
&& close.frame_type.is_some()
373-
&& self.state.on_recv_all_acks().is_ok()
374-
{
375-
self.clean_up();
376-
// transmit one more PTO packet so we can ACK the peer's
377-
// CONNECTION_CLOSE frame and they can shutdown quickly. Otherwise,
378-
// they'll need to hang around to respond to potential loss.
379-
self.pto.force_transmit();
371+
if close.error_code == VarInt::ZERO && close.frame_type.is_some() {
372+
self.unacked_ranges.clear();
373+
self.try_finish();
380374
return Ok(None);
381375
}
382376

@@ -605,27 +599,25 @@ impl State {
605599
#[allow(clippy::redundant_closure_call)]
606600
($on_packet)(&packet);
607601

608-
if let Some(segment) = packet.info.retransmission {
602+
if let Some((segment, retransmission)) = packet.info.retransmit_copy() {
603+
// the stream segment was cleaned up so no need to retransmit
604+
if !self.stream_packet_buffers.contains_key(segment) {
605+
continue;
606+
}
607+
609608
// update our local packet number to be at least 1 more than the largest lost
610609
// packet number
611610
let min_recovery_packet_number = num.as_u64() + 1;
612611
self.recovery_packet_number =
613612
self.recovery_packet_number.max(min_recovery_packet_number);
614613

615-
let retransmission = retransmission::Segment {
616-
segment,
617-
stream_offset: packet.info.stream_offset,
618-
payload_len: packet.info.payload_len,
619-
ty: TransmissionType::Stream,
620-
included_fin: packet.info.included_fin,
621-
};
622614
self.retransmissions.push(retransmission);
623615
} else {
624616
// we can only recover reliable streams
625617
is_unrecoverable |= packet.info.payload_len > 0 && !self.is_reliable;
626618
}
627-
}}
628-
}
619+
}
620+
}}
629621
}
630622

631623
match packet_space {
@@ -649,6 +641,48 @@ impl State {
649641
Ok(())
650642
}
651643

644+
/// Takes the oldest stream packets and tries to make them into PTO packets
645+
///
646+
/// This ensures that we're not wasting resources by sending empty payloads, especially
647+
/// when there's outstanding data waiting to be ACK'd.
648+
fn make_stream_packets_as_pto_probes(&mut self) {
649+
// check to see if we have in-flight stream segments
650+
ensure!(!self.sent_stream_packets.is_empty());
651+
// only reliable streams store segments
652+
ensure!(self.is_reliable);
653+
654+
let pto = self.pto.transmissions() as usize;
655+
656+
// check if we already have retransmissions scheduled
657+
let Some(mut remaining) = pto.checked_sub(self.retransmissions.len()) else {
658+
return;
659+
};
660+
661+
// iterate until remaining is 0.
662+
//
663+
// This nested loop a bit weird but it's intentional - if we have `remaining == 2` but only have a single
664+
// in-flight stream segment then we want to transmit that segment `remaining` times.
665+
while remaining > 0 {
666+
for (num, packet) in self.sent_stream_packets.iter().take(remaining) {
667+
let Some((_segment, retransmission)) = packet.info.retransmit_copy() else {
668+
// unreliable streams don't store segments so bail early - this was checked above
669+
return;
670+
};
671+
672+
// update our local packet number to be at least 1 more than the largest lost
673+
// packet number
674+
let min_recovery_packet_number = num.as_u64() + 1;
675+
self.recovery_packet_number =
676+
self.recovery_packet_number.max(min_recovery_packet_number);
677+
678+
self.retransmissions.push(retransmission);
679+
680+
// consider this as a PTO
681+
remaining -= 1;
682+
}
683+
}
684+
}
685+
652686
#[inline]
653687
fn on_peer_activity(&mut self, newly_acked_packets: bool) {
654688
if let Some(prev) = self.peer_activity.as_mut() {
@@ -894,6 +928,10 @@ impl State {
894928
// try to transition to start sending
895929
let _ = self.state.on_send_stream();
896930
if info.included_fin {
931+
// clear out the unacked ranges that we're no longer tracking
932+
let final_offset = info.end_offset();
933+
let _ = self.unacked_ranges.remove(final_offset..);
934+
897935
// if the transmission included the final offset, then transition to that state
898936
let _ = self.state.on_send_fin();
899937
}
@@ -960,6 +998,9 @@ impl State {
960998
// skip a packet number if we're probing
961999
if self.pto.transmissions() > 0 {
9621000
self.recovery_packet_number += 1;
1001+
1002+
// Try making some existing stream packets as probes instead of transmitting empty ones
1003+
self.make_stream_packets_as_pto_probes();
9631004
}
9641005

9651006
self.try_transmit_retransmissions(control_key, credentials, clock)?;
@@ -983,17 +1024,28 @@ impl State {
9831024
ensure!(self.is_reliable, Ok(()));
9841025

9851026
while let Some(retransmission) = self.retransmissions.peek() {
986-
// make sure we fit in the current congestion window
987-
let remaining_cca_window = self
988-
.cca
989-
.congestion_window()
990-
.saturating_sub(self.cca.bytes_in_flight());
991-
ensure!(
992-
retransmission.payload_len as u32 <= remaining_cca_window,
993-
break
994-
);
1027+
// If the CCA is requesting fast retransmission we can bypass the CWND check
1028+
if !self.cca.requires_fast_retransmission() {
1029+
// make sure we fit in the current congestion window
1030+
let remaining_cca_window = self
1031+
.cca
1032+
.congestion_window()
1033+
.saturating_sub(self.cca.bytes_in_flight());
1034+
ensure!(
1035+
retransmission.payload_len as u32 <= remaining_cca_window,
1036+
break
1037+
);
1038+
}
9951039

996-
let buffer = self.stream_packet_buffers[retransmission.segment].make_mut();
1040+
let Some(buffer) = self.stream_packet_buffers.get_mut(retransmission.segment) else {
1041+
// the segment was acknowledged by another packet so remove it
1042+
let _ = self
1043+
.retransmissions
1044+
.pop()
1045+
.expect("retransmission should be available");
1046+
continue;
1047+
};
1048+
let buffer = buffer.make_mut();
9971049

9981050
debug_assert!(!buffer.is_empty(), "empty retransmission buffer submitted");
9991051

dc/s2n-quic-dc/src/stream/send/state/transmission.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,31 @@ impl<Retransmission> Info<Retransmission> {
6565
(start, end)
6666
}
6767

68+
/// Non-inclusive offset
6869
#[inline]
6970
pub fn end_offset(&self) -> VarInt {
7071
self.stream_offset + VarInt::from_u16(self.payload_len)
7172
}
7273
}
74+
75+
impl<Retransmission: Copy> Info<Retransmission> {
76+
#[inline]
77+
pub fn retransmit_copy(
78+
&self,
79+
) -> Option<(
80+
Retransmission,
81+
super::retransmission::Segment<Retransmission>,
82+
)> {
83+
let segment = self.retransmission?;
84+
85+
let retransmission = super::retransmission::Segment {
86+
segment,
87+
stream_offset: self.stream_offset,
88+
payload_len: self.payload_len,
89+
ty: super::TransmissionType::Stream,
90+
included_fin: self.included_fin,
91+
};
92+
93+
Some((segment, retransmission))
94+
}
95+
}

dc/s2n-quic-dc/src/stream/tests/rpc.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,51 +66,67 @@ fn no_loss() {
6666
sim(|| {
6767
hello_goodbye();
6868

69-
{
70-
::bach::net::monitor::on_socket_write(move |write| {
71-
let count = COUNT.fetch_add(1, Ordering::Relaxed) + 1;
72-
assert!(count <= 4, "flow should only consume 4 packets\n{write:#?}");
73-
tracing::info!(?write, "on_socket_write");
74-
Ok(())
75-
});
76-
}
69+
::bach::net::monitor::on_packet_sent(move |packet| {
70+
let count = COUNT.fetch_add(1, Ordering::Relaxed) + 1;
71+
assert!(
72+
count <= 4,
73+
"flow should only consume 4 packets\n{packet:#?}"
74+
);
75+
tracing::info!(?packet, "on_packet_sent");
76+
Default::default()
77+
});
7778
});
7879

7980
assert_eq!(COUNT.load(Ordering::Relaxed), 4);
8081
}
8182

8283
// TODO use this with bach >= 0.0.13
83-
// Also - this test is broken... need to fix the impl
8484
#[cfg(todo)]
8585
#[test]
8686
fn packet_loss() {
87+
use core::sync::atomic::{AtomicUsize, Ordering};
88+
8789
check!()
8890
.exhaustive()
8991
.with_generator(0usize..=4)
9092
.cloned()
9193
.for_each(|loss_idx| {
94+
let max_count = match loss_idx {
95+
// the first two are Stream packets
96+
0..=1 => 6,
97+
// the next ones are Control packets, which cause 1 extra packet, since the
98+
// sender also needs to transmit the Stream packet again.
99+
2..=3 => 7,
100+
// otherwise, it should only take 4
101+
_ => 4,
102+
};
103+
104+
static COUNT: AtomicUsize = AtomicUsize::new(0);
105+
106+
// reset the count back to 0
107+
COUNT.store(0, Ordering::Relaxed);
108+
92109
sim(|| {
93110
hello_goodbye();
94111

95-
{
96-
let mut count = 0;
97-
::bach::net::monitor::on_packet_sent(move |packet| {
98-
let idx = count;
99-
count += 1;
112+
::bach::net::monitor::on_packet_sent(move |packet| {
113+
let idx = COUNT.fetch_add(1, Ordering::Relaxed);
114+
let count = idx + 1;
100115

101-
assert!(
102-
count <= 5,
103-
"flow should only consume 5 packets\n{packet:#?}"
104-
);
116+
assert!(
117+
count <= max_count,
118+
"flow should only consume {max_count} packets\n{packet:#?}"
119+
);
105120

106-
if loss_idx == idx {
107-
return ::bach::net::monitor::Command::Drop;
108-
}
121+
if loss_idx == idx {
122+
return ::bach::net::monitor::Command::Drop;
123+
}
109124

110-
Default::default()
111-
});
112-
}
125+
Default::default()
126+
});
113127
});
128+
129+
assert_eq!(COUNT.swap(0, Ordering::Relaxed), max_count);
114130
});
115131
}
116132

0 commit comments

Comments
 (0)