Skip to content

Conversation

gretchenfrage
Copy link
Collaborator

This PR is competing with and mutually exclusive with my previous #2242 and #2230. I think this is the best iteration so far.

Successfully removes WriteSource without utilizing a callback (per #2230) nor introducing lifetime stuff (per #2242), and successfully splits up the write_source method's logic more deeply than those two.

Introduces dependency on indexmap crate. This seems fine to me. It's an extremely common crate and likely to be useful for other things too.

Notes on possible follow-up work

As I've expressed on the other PRs, I like the idea of introducing some sort of public API capable of doing what WriteSource can do. This PR is actually more helpful to that goal than #2242. In #2242, introducing some sort of quinn version of WriteGuard<'_> would mean we'd want to create a struct which contains both a mutex guard and a proto::WriteGuard<'_> which borrows from that mutex guard. This probably would require us to either pull in the ouroboros crate or write equivalent unsafe code manually, or to complicate the API in some ugly way. Conversely, with the approach implemented in this PR, we could conceivably just make write_limit and write_unchecked public and give quinn a WriteGuard struct which contains a mutex guard and a usize write budget. Or something like that.

Also, a different bit of follow-up work that could be done after that would be to modify RecvStream to do a similar sort of preemptive hashmap lookup, if we wanted.

@gretchenfrage gretchenfrage requested review from djc and Ralith as code owners June 4, 2025 00:19
@gretchenfrage gretchenfrage force-pushed the indexmap-write-source-refactor branch from ec4c939 to b660edd Compare June 4, 2025 00:22
@@ -233,7 +228,9 @@ impl<'a> SendStream<'a> {
///
/// Returns the number of bytes successfully written.
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes)
let prefix_len = self.write_limit()?.min(data.len());
self.write_unchecked(data[..prefix_len].to_vec().into());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this to Ralith's suggestion from the previous PR before merging.

Makes it so that SendStream is always constructed with SendStream::new.
This is in preparation for subsequent commit which will make constructor
non-trivial.

Previously, `new` was `#[cfg(fuzzing)] pub`. Now, the `#[cfg(fuzzing)]`
is removed and `pub` is changed to `pub(super)`. Fuzz code is modified
to call a new `#[cfg(fuzzing)] pub fn new_for_fuzzing` delegate method.
Adds dependency on indexmap crate, and changes StreamState.send from a
HashMap to an IndexMap. Updates some usages to be idiomatic /
performant. This is in preparation for a subsequent commit which will
exploit useful properties of IndexMap.
Previously, all 5 primitive methods on SendStream (write_source, stopped
finish, set_priority, and priority) performed a self.state.send[self.id]
hashmap lookup. Now that self.state.send is an IndexMap, this commit
replaces these individual lookups with a single key-to-index lookup upon
construction, and saves the Option<usize> result to a new field.

This removes certain tradeoffs between performance and borrowing
complexity, and thereby facilitates subsequent refactor commits.
Prior to this commit, SendStream had several different write methods
each of which had to:

1. Perform some shared logic to check how much can be written.
2. Do source-specific logic to iterate over a series of `Bytes` chunk
   to write
3. For each one of those, perform some shared logic to write the bytes.

Prior to this commit, this this was achieved with the WriteSource trait.
Previous attempts to simplify this and remove the WriteSource trait were
complicated by the fact that it is desirable to cache a single StreamId
hashmap lookup over the course of this whole process. However, that is
no longer an obstacle due to previous commits.

Thus, this commit basically splits the SendStream::write_source method
into two methods (both of which are still private):

1. A `write_limit` method, which checks how many bytes can be written.
2. A `write_unchecked` method, which writes bytes under the assumption
   that the write limit is being expected, and does not itself check
   limits or error conditions.

As such, the `write` and `write_chunks` methods are rewritten to call
these methods directly, and the `WriteSource` trait system is removed
entirely, achieving significant simplification.
@gretchenfrage gretchenfrage force-pushed the indexmap-write-source-refactor branch from b660edd to df184d1 Compare June 4, 2025 00:27
Copy link
Collaborator

@Ralith Ralith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the procedural simplicity this gives rise to! Good call finding a way to make this compose gracefully with a mutex at the quinn layer, too.

Previous attempts to simplify this and remove the WriteSource trait were
complicated by the fact that it is desirable to cache a single StreamId
hashmap lookup over the course of this whole process.

Was that really the issue? This feels like it might be a premature optimization. FxHashMap on StreamId should be comically fast already. IndexMap's implementation looks great, but it is more deps for unclear benefit, and something we could pursue in future without breakage if a need is shown. Unless you've done benchmarking already?

@@ -203,7 +203,16 @@ pub struct SendStream<'a> {
#[allow(clippy::needless_lifetimes)] // Needed for cfg(fuzzing)
impl<'a> SendStream<'a> {
#[cfg(fuzzing)]
pub fn new(
pub fn new_for_fuzzing(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshed: I'd prefer that this always be named new. We could achieve that by having two definitions, selected by #[cfg(fuzzing)] state and differing only in visibility, both of which delegate to a private new_inner or similar.

@@ -195,6 +195,7 @@ impl RecvStream<'_> {
/// Access to streams
pub struct SendStream<'a> {
pub(super) id: StreamId,
pub(super) index: Option<usize>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the swap_remove_entry call introduced in the previous commit invalidates this?

.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(WriteError::ClosedStream)?;
let index = self.index.ok_or(WriteError::ClosedStream)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise this condition in new?

}

fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
/// Get how many bytes could be written immediately, or mark as blocked if zero
fn write_limit(&mut self) -> Result<usize, WriteError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bikeshed: this name sounds like it's writing a limit somewhere. Maybe max_write_len? That's still weird given that this can have side effects and is fallible, though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just write_limited()?

Copy link
Collaborator

@Ralith Ralith Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That also seems confusing. This is serving the dual purpose of declaring intent to write and getting the current write limit, but is not actually writing anything. I wonder if we could split those two roles up without significant cost?

}

/// Write bytes under the assumption that `write_limit().unwrap() <= chunk.len()`
fn write_unchecked(&mut self, chunk: Bytes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually need to be unchecked? Seems like it might be fairly cheap to detect incorrect use. Maybe a debug_assert, at least...

@gretchenfrage
Copy link
Collaborator Author

Previous attempts to simplify this and remove the WriteSource trait were
complicated by the fact that it is desirable to cache a single StreamId
hashmap lookup over the course of this whole process.

Was that really the issue? This feels like it might be a premature optimization. FxHashMap on StreamId should be comically fast already. IndexMap's implementation looks great, but it is more deps for unclear benefit, and something we could pursue in future without breakage if a need is shown. Unless you've done benchmarking already?

I made a comment to this effect in the message of #2242:

I did some looking and it looks like the write_source API was introduced in #1013. The main aim of that PR seemed to be simply to introduce the ability to directly use Bytes in general. This made me wonder why this wasn't done by adding a method that just wrote one Bytes at a time. From looking at the code, it looks like the main penalty to do this would be duplicated hash table look-ups for the stream. Caching the &mut, as this PR does, avoids that.

@Ralith
Copy link
Collaborator

Ralith commented Jun 7, 2025

The reason behind our batched high level APIs isn't to avoid multiple hash-table lookups -- it's to avoid churning the connection Mutex, which might be contended, at the quinn layer. Maybe we pushed that concern down further than necessary before, but we shouldn't hold ourselves to that now.

@gretchenfrage
Copy link
Collaborator Author

Based on your comments about 1. hash map lookups being maybe cheap enough to do a lot of times 2. write_limit checks being maybe cheap enough to do a lot of times, I prototyped this commit here: 866e9c2 which does some deeper refactors that take advantage of considering those things really cheap to do.

Basically it leaves us with the following primitive methods:

    /// Get how many bytes could be written immediately
    ///
    /// Always returns `Ok(0)` if blocked, never returns `WriteError::Blocked`.
    pub fn write_limit(&self) -> Result<usize, WriteError> { .. }

    /// Ensure that a [`StreamEvent::Writable`][1] event is emitted once [`write_limit`][2]
    /// transitions from `Ok(0)` to a non-zero value
    ///
    /// Panics if `self.write_limit() != Ok(0)`.
    ///
    /// [1]: crate::StreamEvent::Writable
    /// [2]: Self::write_limit
    pub fn mark_blocked(&mut self) { .. }

    /// Immediately write the entirety of `chunk` on the given stream, or panic if unable
    ///
    /// Requires that `self.write_limit().unwrap() >= chunk.len()`, panics otherwise.
    pub fn write_immediate(&mut self, chunk: Bytes) { .. }

And also 3 non-primitive methods (doesn't implement anything a user couldn't implement):

  • write
  • write_chunks
  • write_limit_or_mark_blocked

I think that design has some nice qualities of it being easy to reason about error handling. And it doesn't pull in indexmap. @Ralith I'm wondering what you think?

@Ralith
Copy link
Collaborator

Ralith commented Jun 7, 2025

Looks generally attractive. I wonder if we could move the errors from write_limit to mark_blocked, and rename the latter to something like fn begin_write(&mut self) -> Result<(), WriteError> to simplify things for callers a little?

@gretchenfrage
Copy link
Collaborator Author

I wonder if we could move the errors from write_limit to mark_blocked

I don't think that would be the right choice. Basically, it's a natural answer to the question asked. "How many bytes could I write on this stream right now?" "You can never write bytes on this stream again, it's errored."

Renaming the other thing sounds good.

@Ralith
Copy link
Collaborator

Ralith commented Jun 13, 2025

My thinking is that caller logic is simplified if the branch on "is the limit 0" gets pushed down into shared logic, so you can write stream.begin_write()? instead of if limit == 0 { stream.mark_blocked(); return Err(WriteError::Blocked.into()); }. Once begin_write is returning one type of error, it seems like it might be simpler to centralize the rest there.

Both are, unfortunately, easy to forget...

@gretchenfrage
Copy link
Collaborator Author

gretchenfrage commented Jun 13, 2025

On the other hand, if someone is making logic which calls write_limit without immediately following it with a call to begin_write, if it's 0, and we return a limit of 0 in cases where it's errored, I worry that they would never observe and handle the fact that the stream is errored

(Imagine, I don't know, a program which streams in frames from a video camera, and whenever one arrives, sends it down the quic stream iff the stream can currently fit the whole video frame)

@Ralith
Copy link
Collaborator

Ralith commented Jun 14, 2025

Makes sense. I don't feel that strongly, either way is a big improvement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants