Skip to content

Commit 672be92

Browse files
mpsc: Add Sender::try_reserve function (#3418)
* mpsc: Add `Sender::try_reserve` function * Update tokio/src/sync/mpsc/bounded.rs Co-authored-by: Alice Ryhl <[email protected]> * Fix doc links Co-authored-by: Alice Ryhl <[email protected]>
1 parent 766a89b commit 672be92

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

tokio/src/sync/mpsc/bounded.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,58 @@ impl<T> Sender<T> {
603603

604604
Ok(Permit { chan: &self.chan })
605605
}
606+
607+
/// Try to acquire a slot in the channel without waiting for the slot to become
608+
/// available.
609+
///
610+
/// If the channel is full this function will return [`TrySendError`], otherwise
611+
/// if there is a slot available it will return a [`Permit`] that will then allow you
612+
/// to [`send`] on the channel with a guaranteed slot. This function is similar to
613+
/// [`reserve`] execpt it does not await for the slot to become available.
614+
///
615+
/// Dropping [`Permit`] without sending a message releases the capacity back
616+
/// to the channel.
617+
///
618+
/// [`Permit`]: Permit
619+
/// [`send`]: Permit::send
620+
/// [`reserve`]: Sender::reserve
621+
///
622+
/// # Examples
623+
///
624+
/// ```
625+
/// use tokio::sync::mpsc;
626+
///
627+
/// #[tokio::main]
628+
/// async fn main() {
629+
/// let (tx, mut rx) = mpsc::channel(1);
630+
///
631+
/// // Reserve capacity
632+
/// let permit = tx.try_reserve().unwrap();
633+
///
634+
/// // Trying to send directly on the `tx` will fail due to no
635+
/// // available capacity.
636+
/// assert!(tx.try_send(123).is_err());
637+
///
638+
/// // Trying to reserve an additional slot on the `tx` will
639+
/// // fail because there is no capacity.
640+
/// assert!(tx.try_reserve().is_err());
641+
///
642+
/// // Sending on the permit succeeds
643+
/// permit.send(456);
644+
///
645+
/// // The value sent on the permit is received
646+
/// assert_eq!(rx.recv().await.unwrap(), 456);
647+
///
648+
/// }
649+
/// ```
650+
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
651+
match self.chan.semaphore().0.try_acquire(1) {
652+
Ok(_) => {}
653+
Err(_) => return Err(TrySendError::Full(())),
654+
}
655+
656+
Ok(Permit { chan: &self.chan })
657+
}
606658
}
607659

608660
impl<T> Clone for Sender<T> {

tokio/tests/sync_mpsc.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,29 @@ async fn try_send_fail() {
327327
assert!(rx.recv().await.is_none());
328328
}
329329

330+
#[tokio::test]
331+
async fn try_reserve_fails() {
332+
let (tx, mut rx) = mpsc::channel(1);
333+
334+
let permit = tx.try_reserve().unwrap();
335+
336+
// This should fail
337+
match assert_err!(tx.try_reserve()) {
338+
TrySendError::Full(()) => {}
339+
_ => panic!(),
340+
}
341+
342+
permit.send("foo");
343+
344+
assert_eq!(rx.recv().await, Some("foo"));
345+
346+
// Dropping permit releases the slot.
347+
let permit = tx.try_reserve().unwrap();
348+
drop(permit);
349+
350+
let _permit = tx.try_reserve().unwrap();
351+
}
352+
330353
#[tokio::test]
331354
async fn drop_permit_releases_permit() {
332355
// poll_ready reserves capacity, ensure that the capacity is released if tx

0 commit comments

Comments
 (0)