Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;

use bytes::Bytes;
use bytes::{BufMut, Bytes};
use thiserror::Error;
use tracing::{debug, trace};

Expand Down Expand Up @@ -163,13 +163,13 @@ impl DatagramState {
///
/// Returns whether a frame was written. At most `max_size` bytes will be written, including
/// framing.
pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
pub(super) fn write(&mut self, buf: &mut impl BufMut) -> bool {
let datagram = match self.outgoing.pop_front() {
Some(x) => x,
None => return false,
};

if buf.len() + datagram.size(true) > max_size {
if datagram.size(true) > buf.remaining_mut() {
// Future work: we could be more clever about cramming small datagrams into
// mostly-full packets when a larger one is queued first
self.outgoing.push_front(datagram);
Expand Down
311 changes: 156 additions & 155 deletions quinn-proto/src/connection/mod.rs

Large diffs are not rendered by default.

55 changes: 27 additions & 28 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use bytes::Bytes;
use bytes::{BufMut, Bytes};
use rand::Rng;
use tracing::{trace, trace_span};

use super::{Connection, SentFrames, spaces::SentPacket};
use super::{Connection, DatagramBuffer, SentFrames, spaces::SentPacket};
use crate::{
ConnectionId, Instant, TransportError, TransportErrorCode,
connection::ConnectionSide,
Expand All @@ -11,17 +11,17 @@ use crate::{
};

pub(super) struct PacketBuilder {
pub(super) datagram_start: usize,
pub(super) space: SpaceId,
pub(super) partial_encode: PartialEncode,
pub(super) ack_eliciting: bool,
pub(super) exact_number: u64,
pub(super) short_header: bool,
/// Smallest absolute position in the associated buffer that must be occupied by this packet's
/// frames
/// The smallest datagram offset that must be occupied by this packet's frames
///
/// This is the smallest offset into the datagram this packet is being written into,
/// that must contain frames for this packet.
pub(super) min_size: usize,
/// Largest absolute position in the associated buffer that may be occupied by this packet's
/// frames
/// The largest datagram offset that may be occupied by this packet's frames
pub(super) max_size: usize,
pub(super) tag_len: usize,
pub(super) _span: tracing::span::EnteredSpan,
Expand All @@ -36,9 +36,7 @@ impl PacketBuilder {
now: Instant,
space_id: SpaceId,
dst_cid: ConnectionId,
buffer: &mut Vec<u8>,
buffer_capacity: usize,
datagram_start: usize,
datagram: &mut DatagramBuffer<'_>,
ack_eliciting: bool,
conn: &mut Connection,
) -> Option<Self> {
Expand Down Expand Up @@ -122,9 +120,9 @@ impl PacketBuilder {
version,
}),
};
let partial_encode = header.encode(buffer);
let partial_encode = header.encode(datagram);
if conn.peer_params.grease_quic_bit && conn.rng.random() {
buffer[partial_encode.start] ^= FIXED_BIT;
datagram[partial_encode.start] ^= FIXED_BIT;
}

let (sample_size, tag_len) = if let Some(ref crypto) = space.crypto {
Expand All @@ -148,14 +146,13 @@ impl PacketBuilder {
// pn_len + payload_len + tag_len >= sample_size + 4
// payload_len >= sample_size + 4 - pn_len - tag_len
let min_size = Ord::max(
buffer.len() + (sample_size + 4).saturating_sub(number.len() + tag_len),
datagram.len() + (sample_size + 4).saturating_sub(number.len() + tag_len),
partial_encode.start + dst_cid.len() + 6,
);
let max_size = buffer_capacity - tag_len;
let max_size = datagram.capacity() - tag_len;
debug_assert!(max_size >= min_size);

Some(Self {
datagram_start,
space: space_id,
partial_encode,
exact_number,
Expand All @@ -174,23 +171,20 @@ impl PacketBuilder {
// The datagram might already have a larger minimum size than the caller is requesting, if
// e.g. we're coalescing packets and have populated more than `min_size` bytes with packets
// already.
self.min_size = Ord::max(
self.min_size,
self.datagram_start + (min_size as usize) - self.tag_len,
);
self.min_size = Ord::max(self.min_size, (min_size as usize) - self.tag_len);
}

pub(super) fn finish_and_track(
self,
now: Instant,
conn: &mut Connection,
sent: Option<SentFrames>,
buffer: &mut Vec<u8>,
datagram: &mut DatagramBuffer<'_>,
) {
let ack_eliciting = self.ack_eliciting;
let exact_number = self.exact_number;
let space_id = self.space;
let (size, padded) = self.finish(conn, buffer);
let (size, padded) = self.finish(conn, datagram);
let sent = match sent {
Some(sent) => sent,
None => return,
Expand Down Expand Up @@ -228,11 +222,16 @@ impl PacketBuilder {
}

/// Encrypt packet, returning the length of the packet and whether padding was added
pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec<u8>) -> (usize, bool) {
let pad = buffer.len() < self.min_size;
pub(super) fn finish(
self,
conn: &mut Connection,
datagram: &mut DatagramBuffer<'_>,
) -> (usize, bool) {
let pad = self.min_size > datagram.len();
if pad {
trace!("PADDING * {}", self.min_size - buffer.len());
buffer.resize(self.min_size, 0);
let padding_bytes = self.min_size - datagram.len();
trace!("PADDING * {padding_bytes}");
datagram.put_bytes(0, padding_bytes);
}

let space = &conn.spaces[self.space];
Expand All @@ -251,15 +250,15 @@ impl PacketBuilder {
"Mismatching crypto tag len"
);

buffer.resize(buffer.len() + packet_crypto.tag_len(), 0);
datagram.put_bytes(0, packet_crypto.tag_len());
let encode_start = self.partial_encode.start;
let packet_buf = &mut buffer[encode_start..];
let packet_buf = &mut datagram[encode_start..];
self.partial_encode.finish(
packet_buf,
header_crypto,
Some((self.exact_number, packet_crypto)),
);

(buffer.len() - encode_start, pad)
(datagram.len() - encode_start, pad)
}
}
56 changes: 26 additions & 30 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,13 @@ impl StreamsState {

pub(in crate::connection) fn write_control_frames(
&mut self,
buf: &mut Vec<u8>,
buf: &mut impl BufMut,
pending: &mut Retransmits,
retransmits: &mut ThinRetransmits,
stats: &mut FrameStats,
max_size: usize,
) {
// RESET_STREAM
while buf.len() + frame::ResetStream::SIZE_BOUND < max_size {
while frame::ResetStream::SIZE_BOUND < buf.remaining_mut() {
let (id, error_code) = match pending.reset_stream.pop() {
Some(x) => x,
None => break,
Expand All @@ -442,7 +441,7 @@ impl StreamsState {
}

// STOP_SENDING
while buf.len() + frame::StopSending::SIZE_BOUND < max_size {
while frame::StopSending::SIZE_BOUND < buf.remaining_mut() {
let frame = match pending.stop_sending.pop() {
Some(x) => x,
None => break,
Expand All @@ -461,7 +460,7 @@ impl StreamsState {
}

// MAX_DATA
if pending.max_data && buf.len() + 9 < max_size {
if pending.max_data && 9 < buf.remaining_mut() {
pending.max_data = false;

// `local_max_data` can grow bigger than `VarInt`.
Expand All @@ -484,7 +483,7 @@ impl StreamsState {
}

// MAX_STREAM_DATA
while buf.len() + 17 < max_size {
while 17 < buf.remaining_mut() {
let id = match pending.max_stream_data.iter().next() {
Some(x) => *x,
None => break,
Expand Down Expand Up @@ -516,7 +515,7 @@ impl StreamsState {

// MAX_STREAMS
for dir in Dir::iter() {
if !pending.max_stream_id[dir as usize] || buf.len() + 9 >= max_size {
if !pending.max_stream_id[dir as usize] || 9 >= buf.remaining_mut() {
continue;
}

Expand All @@ -541,19 +540,11 @@ impl StreamsState {

pub(crate) fn write_stream_frames(
&mut self,
buf: &mut Vec<u8>,
max_buf_size: usize,
buf: &mut impl BufMut,
fair: bool,
) -> StreamMetaVec {
let mut stream_frames = StreamMetaVec::new();
while buf.len() + frame::Stream::SIZE_BOUND < max_buf_size {
if max_buf_size
.checked_sub(buf.len() + frame::Stream::SIZE_BOUND)
.is_none()
{
break;
}

while frame::Stream::SIZE_BOUND < buf.remaining_mut() {
// Pop the stream of the highest priority that currently has pending data
// If the stream still has some pending data left after writing, it will be reinserted, otherwise not
let Some(stream) = self.pending.pop() else {
Expand All @@ -577,7 +568,7 @@ impl StreamsState {

// Now that we know the `StreamId`, we can better account for how many bytes
// are required to encode it.
let max_buf_size = max_buf_size - buf.len() - 1 - VarInt::size(id.into());
let max_buf_size = buf.remaining_mut() - 1 - VarInt::size(id.into());
let (offsets, encode_length) = stream.pending.poll_transmit(max_buf_size);
let fin = offsets.end == stream.pending.offset()
&& matches!(stream.state, SendState::DataSent { .. });
Expand Down Expand Up @@ -1379,8 +1370,8 @@ mod tests {
high.set_priority(1).unwrap();
high.write(b"high").unwrap();

let mut buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf, 40, true);
let buf = Vec::with_capacity(40);
let meta = server.write_stream_frames(&mut buf.limit(40), true);
assert_eq!(meta[0].id, id_high);
assert_eq!(meta[1].id, id_mid);
assert_eq!(meta[2].id, id_low);
Expand Down Expand Up @@ -1438,16 +1429,18 @@ mod tests {
};
high.set_priority(-1).unwrap();

let mut buf = Vec::with_capacity(1000);
let meta = server.write_stream_frames(&mut buf, 40, true);
let buf = Vec::with_capacity(1000);
let mut buf = buf.limit(40);
let meta = server.write_stream_frames(&mut buf, true);
let buf = buf.into_inner();
assert_eq!(meta.len(), 1);
assert_eq!(meta[0].id, id_high);

// After requeuing we should end up with 2 priorities - not 3
assert_eq!(server.pending.len(), 2);

// Send the remaining data. The initial mid priority one should go first now
let meta = server.write_stream_frames(&mut buf, 1000, true);
let meta = server.write_stream_frames(&mut buf.limit(1000), true);
assert_eq!(meta.len(), 2);
assert_eq!(meta[0].id, id_mid);
assert_eq!(meta[1].id, id_high);
Expand Down Expand Up @@ -1507,8 +1500,9 @@ mod tests {

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, fair);
let mut lbuf = buf.limit(40);
let meta = server.write_stream_frames(&mut lbuf, fair);
buf = lbuf.into_inner();
if meta.is_empty() {
break;
}
Expand Down Expand Up @@ -1575,11 +1569,12 @@ mod tests {
stream_b.write(&[b'b'; 100]).unwrap();

let mut metas = vec![];
let mut buf = Vec::with_capacity(1024);
let buf = Vec::with_capacity(1024);

// Write the first chunk of stream_a
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
let mut buf = buf.limit(40);
let meta = server.write_stream_frames(&mut buf, false);
let mut buf = buf.into_inner();
assert!(!meta.is_empty());
metas.extend(meta);

Expand All @@ -1595,8 +1590,9 @@ mod tests {

// loop until all the streams are written
loop {
let buf_len = buf.len();
let meta = server.write_stream_frames(&mut buf, buf_len + 40, false);
let mut lbuf = buf.limit(40);
let meta = server.write_stream_frames(&mut lbuf, false);
buf = lbuf.into_inner();
if meta.is_empty() {
break;
}
Expand Down
Loading
Loading