Skip to content

Commit b9b5324

Browse files
authored
sync: clarify the behavior of tokio::sync::watch::Receiver (#7584)
1 parent 1b98d5a commit b9b5324

File tree

2 files changed

+138
-13
lines changed

2 files changed

+138
-13
lines changed

tokio/src/sync/watch.rs

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
3030
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
3131
//!
32-
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33-
//! `Err(`[`error::RecvError`]`)` if all [`Sender`]s have been dropped.
32+
//! * [`Receiver::changed()`] returns:
33+
//! * `Ok(())` on receiving a new value.
34+
//! * `Err(`[`RecvError`](error::RecvError)`)` if the
35+
//! channel has been closed __AND__ the current value is *seen*.
3436
//! * If the current value is *unseen* when calling [`changed`], then
3537
//! [`changed`] will return immediately. If the current value is *seen*, then
3638
//! it will sleep until either a new message is sent via the [`Sender`] half,
@@ -42,7 +44,28 @@
4244
//! The current value at the time the [`Receiver`] is created is considered
4345
//! *seen*.
4446
//!
45-
//! ## `borrow_and_update` versus `borrow`
47+
//! ## [`changed`] versus [`has_changed`]
48+
//!
49+
//! The [`Receiver`] half provides two methods for checking for changes
50+
//! in the channel, [`has_changed`] and [`changed`].
51+
//!
52+
//! * [`has_changed`] is a *synchronous* method that checks whether the current
53+
//! value is seen or not and returns a boolean. This method does __not__ mark the
54+
//! value as seen.
55+
//!
56+
//! * [`changed`] is an *asynchronous* method that will return once an unseen
57+
//! value is in the channel. This method does mark the value as seen.
58+
//!
59+
//! Note there are two behavioral differences on when these two methods return
60+
//! an error.
61+
//!
62+
//! - [`has_changed`] errors if and only if the channel is closed.
63+
//! - [`changed`] errors if the channel has been closed __AND__
64+
//! the current value is seen.
65+
//!
66+
//! See the example below that shows how these methods have different fallibility.
67+
//!
68+
//! ## [`borrow_and_update`] versus [`borrow`]
4669
//!
4770
//! If the receiver intends to await notifications from [`changed`] in a loop,
4871
//! [`Receiver::borrow_and_update()`] should be preferred over
@@ -84,6 +107,31 @@
84107
//! # }
85108
//! ```
86109
//!
110+
//! Difference on fallibility of [`changed`] versus [`has_changed`].
111+
//! ```
112+
//! use tokio::sync::watch;
113+
//!
114+
//! #[tokio::main]
115+
//! async fn main() {
116+
//! let (tx, mut rx) = watch::channel("hello");
117+
//! tx.send("goodbye").unwrap();
118+
//! drop(tx);
119+
//!
120+
//! // `has_changed` does not mark the value as seen and errors
121+
//! // since the channel is closed.
122+
//! assert!(rx.has_changed().is_err());
123+
//!
124+
//! // `changed` returns Ok since the value is not already marked as seen
125+
//! // even if the channel is closed.
126+
//! assert!(rx.changed().await.is_ok());
127+
//!
128+
//! // The `changed` call above marks the value as seen.
129+
//! // The next `changed` call now returns an error as the channel is closed
130+
//! // AND the current value is seen.
131+
//! assert!(rx.changed().await.is_err());
132+
//! }
133+
//! ```
134+
//!
87135
//! # Closing
88136
//!
89137
//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
@@ -102,6 +150,9 @@
102150
//! [`Sender`]: crate::sync::watch::Sender
103151
//! [`Receiver`]: crate::sync::watch::Receiver
104152
//! [`changed`]: crate::sync::watch::Receiver::changed
153+
//! [`has_changed`]: crate::sync::watch::Receiver::has_changed
154+
//! [`borrow`]: crate::sync::watch::Receiver::borrow
155+
//! [`borrow_and_update`]: crate::sync::watch::Receiver::borrow_and_update
105156
//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106157
//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107158
//! [`Receiver::borrow_and_update()`]:
@@ -637,15 +688,20 @@ impl<T> Receiver<T> {
637688
}
638689

639690
/// Checks if this channel contains a message that this receiver has not yet
640-
/// seen. The new value is not marked as seen.
691+
/// seen. The current value will not be marked as seen.
692+
///
693+
/// Although this method is called `has_changed`, it does not check
694+
/// messages for equality, so this call will return true even if the current
695+
/// message is equal to the previous message.
696+
///
697+
/// # Errors
641698
///
642-
/// Although this method is called `has_changed`, it does not check new
643-
/// messages for equality, so this call will return true even if the new
644-
/// message is equal to the old message.
699+
/// Returns a [`RecvError`](error::RecvError) if and only if the channel has been closed.
645700
///
646-
/// Returns an error if the channel has been closed.
647701
/// # Examples
648702
///
703+
/// ## Basic usage
704+
///
649705
/// ```
650706
/// use tokio::sync::watch;
651707
///
@@ -660,9 +716,22 @@ impl<T> Receiver<T> {
660716
///
661717
/// // The value has been marked as seen
662718
/// assert!(!rx.has_changed().unwrap());
719+
/// }
720+
/// ```
721+
///
722+
/// ## Closed channel example
723+
///
724+
/// ```
725+
/// use tokio::sync::watch;
726+
///
727+
/// #[tokio::main]
728+
/// async fn main() {
729+
/// let (tx, rx) = watch::channel("hello");
730+
/// tx.send("goodbye").unwrap();
663731
///
664732
/// drop(tx);
665-
/// // The `tx` handle has been dropped
733+
///
734+
/// // The channel is closed
666735
/// assert!(rx.has_changed().is_err());
667736
/// }
668737
/// ```
@@ -701,19 +770,22 @@ impl<T> Receiver<T> {
701770
self.version = current_version;
702771
}
703772

704-
/// Waits for a change notification, then marks the newest value as seen.
773+
/// Waits for a change notification, then marks the current value as seen.
705774
///
706-
/// If the newest value in the channel has not yet been marked seen when
775+
/// If the current value in the channel has not yet been marked seen when
707776
/// this method is called, the method marks that value seen and returns
708777
/// immediately. If the newest value has already been marked seen, then the
709778
/// method sleeps until a new message is sent by a [`Sender`] connected to
710779
/// this `Receiver`, or until all [`Sender`]s are dropped.
711780
///
712-
/// This method returns an error if and only if all [`Sender`]s are dropped.
713-
///
714781
/// For more information, see
715782
/// [*Change notifications*](self#change-notifications) in the module-level documentation.
716783
///
784+
/// # Errors
785+
///
786+
/// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
787+
/// the current value is seen.
788+
///
717789
/// # Cancel safety
718790
///
719791
/// This method is cancel safe. If you use it as the event in a

tokio/tests/sync_watch.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,3 +450,56 @@ async fn sender_closed_is_cooperative() {
450450
_ = tokio::task::yield_now() => {},
451451
}
452452
}
453+
454+
#[tokio::test]
455+
async fn changed_succeeds_on_closed_channel_with_unseen_value() {
456+
let (tx, mut rx) = watch::channel("A");
457+
tx.send("B").unwrap();
458+
459+
drop(tx);
460+
461+
rx.changed()
462+
.await
463+
.expect("should not return error as long as the current value is not seen");
464+
}
465+
466+
#[tokio::test]
467+
async fn changed_errors_on_closed_channel_with_seen_value() {
468+
let (tx, mut rx) = watch::channel("A");
469+
drop(tx);
470+
471+
rx.changed()
472+
.await
473+
.expect_err("should return error if the tx is closed and the current value is seen");
474+
}
475+
476+
#[test]
477+
fn has_changed_errors_on_closed_channel_with_unseen_value() {
478+
let (tx, rx) = watch::channel("A");
479+
tx.send("B").unwrap();
480+
481+
drop(tx);
482+
483+
rx.has_changed()
484+
.expect_err("`has_changed` returns an error if and only if channel is closed. Even if the current value is not seen.");
485+
}
486+
487+
#[test]
488+
fn has_changed_errors_on_closed_channel_with_seen_value() {
489+
let (tx, rx) = watch::channel("A");
490+
drop(tx);
491+
492+
rx.has_changed()
493+
.expect_err("`has_changed` returns an error if and only if channel is closed.");
494+
}
495+
496+
#[tokio::test]
497+
async fn wait_for_errors_on_closed_channel_true_predicate() {
498+
let (tx, mut rx) = watch::channel("A");
499+
tx.send("B").unwrap();
500+
drop(tx);
501+
502+
rx.wait_for(|_| true).await.expect(
503+
"`wait_for` call does not return error even if channel is closed when predicate is true for last value.",
504+
);
505+
}

0 commit comments

Comments
 (0)