Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 53 additions & 22 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
//!
//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
//! * [`Receiver::changed()`] returns:
//! * `Ok(())` on receiving a new value.
//! * `Err(`[`RecvError`](error::RecvError)`)` if the
//! channel has been closed __AND__ the current value is *seen*.
//! * If the current value is *unseen* when calling [`changed`], then
//! [`changed`] will return immediately. If the current value is *seen*, then
//! it will sleep until either a new message is sent via the [`Sender`] half,
Expand Down Expand Up @@ -198,7 +200,7 @@ impl<'a, T> Ref<'a, T> {
/// Indicates if the borrowed value is considered as _changed_ since the last
/// time it has been marked as seen.
///
/// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
/// Unlike [`Receiver::has_changed()`], this method is not fallible.
///
/// When borrowed from the [`Sender`] this function will always return `false`.
///
Expand All @@ -218,10 +220,10 @@ impl<'a, T> Ref<'a, T> {
/// // Drop the sender immediately, just for testing purposes.
/// drop(tx);
///
/// // Even if the sender has already been dropped...
/// assert!(rx.has_changed().is_err());
/// // ...the modified value is still readable and detected as changed.
/// // The modified value is still readable and detected as changed
/// // even if the sender has already been dropped.
/// assert_eq!(*rx.borrow(), "goodbye");
/// assert!(rx.has_changed().unwrap());
/// assert!(rx.borrow().has_changed());
///
/// // Read the changed value and mark it as seen.
Expand Down Expand Up @@ -637,15 +639,20 @@ impl<T> Receiver<T> {
}

/// Checks if this channel contains a message that this receiver has not yet
/// seen. The new value is not marked as seen.
/// seen. The current value will not be marked as seen.
///
/// Although this method is called `has_changed`, it does not check
/// messages for equality, so this call will return true even if the current
/// message is equal to the previous message.
///
/// Although this method is called `has_changed`, it does not check new
/// messages for equality, so this call will return true even if the new
/// message is equal to the old message.
/// # Errors
///
/// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
/// the current value is seen.
///
/// Returns an error if the channel has been closed.
/// # Examples
///
/// ## Basic usage
/// ```
/// use tokio::sync::watch;
///
Expand All @@ -660,22 +667,42 @@ impl<T> Receiver<T> {
///
/// // The value has been marked as seen
/// assert!(!rx.has_changed().unwrap());
/// }
/// ```
///
/// ## Closed channel example
/// ```
/// use tokio::sync::watch;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = watch::channel("hello");
/// tx.send("goodbye").unwrap();
/// drop(tx);
/// // The `tx` handle has been dropped
///
/// // `has_changed` returns Ok(true) as the current value is not seen.
/// assert!(rx.has_changed().unwrap());
///
/// // Marks the current value as seen.
/// assert_eq!(*rx.borrow_and_update(), "goodbye");
///
/// // The `tx` handle has been dropped __AND__ the current value is seen.
/// assert!(rx.has_changed().is_err());
/// }
/// ```
pub fn has_changed(&self) -> Result<bool, error::RecvError> {
// Load the version from the state
let state = self.shared.state.load();
if state.is_closed() {
// The sender has dropped.
return Err(error::RecvError(()));
}
let new_version = state.version();
let current_version = state.version();

let current_value_is_seen = self.version == current_version;
let sender_has_dropped = state.is_closed();

Ok(self.version != new_version)
if sender_has_dropped && current_value_is_seen {
Err(error::RecvError(()))
} else {
Ok(!current_value_is_seen)
}
}

/// Marks the state as changed.
Expand All @@ -701,19 +728,23 @@ impl<T> Receiver<T> {
self.version = current_version;
}

/// Waits for a change notification, then marks the newest value as seen.
/// Waits for a change notification, then marks the current value as seen.
///
/// If the newest value in the channel has not yet been marked seen when
/// If the current value in the channel has not yet been marked seen when
/// this method is called, the method marks that value seen and returns
/// immediately. If the newest value has already been marked seen, then the
/// immediately. If the current value has already been marked seen, then the
/// method sleeps until a new message is sent by the [`Sender`] connected to
/// this `Receiver`, or until the [`Sender`] is dropped.
///
/// This method returns an error if and only if the [`Sender`] is dropped.
///
/// For more information, see
/// [*Change notifications*](self#change-notifications) in the module-level documentation.
///
/// # Errors
///
/// Returns a [`RecvError`](error::RecvError) if the channel has been closed __AND__
/// the current value is seen.
///
/// # Cancel safety
///
/// This method is cancel safe. If you use it as the event in a
Expand Down
56 changes: 56 additions & 0 deletions tokio/tests/sync_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,59 @@ async fn sender_closed_is_cooperative() {
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn changed_does_not_error_on_closed_channel_with_unseen_value() {
let (tx, mut rx) = watch::channel("A");
tx.send("B").unwrap();

drop(tx);

rx.changed()
.await
.expect("`changed` call does not return an error if the last value is not seen.");
}

#[tokio::test]
async fn has_changed_does_not_error_on_closed_channel_with_unseen_value() {
let (tx, rx) = watch::channel("A");
tx.send("B").unwrap();

drop(tx);

let has_changed = rx
.has_changed()
.expect("`has_changed` call does not return an error if the last value is not seen.");

assert!(has_changed, "Latest value is not seen");
}

#[tokio::test]
async fn changed_errors_on_closed_channel_with_seen_value() {
let (tx, mut rx) = watch::channel("A");
drop(tx);

rx.changed()
.await
.expect_err("`changed` call returns an error if the last value is seen.");
}

#[tokio::test]
async fn has_changed_errors_on_closed_channel_with_seen_value() {
let (tx, rx) = watch::channel("A");
drop(tx);

rx.has_changed()
.expect_err("`has_changed` call returns an error if the last value is seen.");
}

#[tokio::test]
async fn wait_for_errors_on_closed_channel_true_predicate() {
let (tx, mut rx) = watch::channel("A");
tx.send("B").unwrap();
drop(tx);

rx.wait_for(|_| true).await.expect(
"`wait_for` call does not return error even if channel is closed when predicate is true for last value.",
);
}
Loading