Skip to content

Commit 8e999e3

Browse files
authored
macros: add biased mode to join! and try_join! (#7307)
1 parent 1d98014 commit 8e999e3

File tree

5 files changed

+297
-41
lines changed

5 files changed

+297
-41
lines changed

tokio/src/macros/join.rs

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ macro_rules! doc {
2121
/// The supplied futures are stored inline and do not require allocating a
2222
/// `Vec`.
2323
///
24-
/// ### Runtime characteristics
24+
/// ## Runtime characteristics
2525
///
2626
/// By running all async expressions on the current task, the expressions are
2727
/// able to run **concurrently** but not in **parallel**. This means all
@@ -32,6 +32,25 @@ macro_rules! doc {
3232
///
3333
/// [`tokio::spawn`]: crate::spawn
3434
///
35+
/// ## Fairness
36+
///
37+
/// By default, `join!`'s generated future rotates which contained
38+
/// future is polled first whenever it is woken.
39+
///
40+
/// This behavior can be overridden by adding `biased;` to the beginning of the
41+
/// macro usage. See the examples for details. This will cause `join` to poll
42+
/// the futures in the order they appear from top to bottom.
43+
///
44+
/// You may want this if your futures may interact in a way where known polling order is significant.
45+
///
46+
/// But there is an important caveat to this mode. It becomes your responsibility
47+
/// to ensure that the polling order of your futures is fair. If for example you
48+
/// are joining a stream and a shutdown future, and the stream has a
49+
/// huge volume of messages that takes a long time to finish processing per poll, you should
50+
/// place the shutdown future earlier in the `join!` list to ensure that it is
51+
/// always polled, and will not be delayed due to the stream future taking a long time to return
52+
/// `Poll::Pending`.
53+
///
3554
/// # Examples
3655
///
3756
/// Basic join with two branches
@@ -54,6 +73,30 @@ macro_rules! doc {
5473
/// // do something with the values
5574
/// }
5675
/// ```
76+
///
77+
/// Using the `biased;` mode to control polling order.
78+
///
79+
/// ```
80+
/// async fn do_stuff_async() {
81+
/// // async work
82+
/// }
83+
///
84+
/// async fn more_async_work() {
85+
/// // more here
86+
/// }
87+
///
88+
/// #[tokio::main]
89+
/// async fn main() {
90+
/// let (first, second) = tokio::join!(
91+
/// biased;
92+
/// do_stuff_async(),
93+
/// more_async_work()
94+
/// );
95+
///
96+
/// // do something with the values
97+
/// }
98+
/// ```
99+
57100
#[macro_export]
58101
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
59102
$join
@@ -62,12 +105,16 @@ macro_rules! doc {
62105

63106
#[cfg(doc)]
64107
doc! {macro_rules! join {
65-
($($future:expr),*) => { unimplemented!() }
108+
($(biased;)? $($future:expr),*) => { unimplemented!() }
66109
}}
67110

68111
#[cfg(not(doc))]
69112
doc! {macro_rules! join {
70113
(@ {
114+
// Type of rotator that controls which inner future to start with
115+
// when polling our output future.
116+
rotator=$rotator:ty;
117+
71118
// One `_` for each branch in the `join!` macro. This is not used once
72119
// normalization is complete.
73120
( $($count:tt)* )
@@ -96,25 +143,19 @@ doc! {macro_rules! join {
96143
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
97144
let mut futures = &mut futures;
98145

99-
// Each time the future created by poll_fn is polled, a different future will be polled first
100-
// to ensure every future passed to join! gets a chance to make progress even if
101-
// one of the futures consumes the whole budget.
102-
//
103-
// This is number of futures that will be skipped in the first loop
104-
// iteration the next time.
105-
let mut skip_next_time: u32 = 0;
146+
const COUNT: u32 = $($total)*;
106147

107-
poll_fn(move |cx| {
108-
const COUNT: u32 = $($total)*;
148+
// Each time the future created by poll_fn is polled, if not using biased mode,
149+
// a different future is polled first to ensure every future passed to join!
150+
// can make progress even if one of the futures consumes the whole budget.
151+
let mut rotator = <$rotator>::default();
109152

153+
poll_fn(move |cx| {
110154
let mut is_pending = false;
111-
112155
let mut to_run = COUNT;
113156

114157
// The number of futures that will be skipped in the first loop iteration.
115-
let mut skip = skip_next_time;
116-
117-
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
158+
let mut skip = rotator.num_skip();
118159

119160
// This loop runs twice and the first `skip` futures
120161
// are not polled in the first iteration.
@@ -164,15 +205,51 @@ doc! {macro_rules! join {
164205

165206
// ===== Normalize =====
166207

167-
(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
168-
$crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
208+
(@ { rotator=$rotator:ty; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
209+
$crate::join!(@{ rotator=$rotator; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
169210
};
170211

171212
// ===== Entry point =====
213+
( biased; $($e:expr),+ $(,)?) => {
214+
$crate::join!(@{ rotator=$crate::macros::support::BiasedRotator; () (0) } $($e,)*)
215+
};
172216

173217
( $($e:expr),+ $(,)?) => {
174-
$crate::join!(@{ () (0) } $($e,)*)
218+
$crate::join!(@{ rotator=$crate::macros::support::Rotator<COUNT>; () (0) } $($e,)*)
175219
};
176220

221+
(biased;) => { async {}.await };
222+
177223
() => { async {}.await }
178224
}}
225+
226+
/// Rotates by one each [`Self::num_skip`] call up to COUNT - 1.
227+
#[derive(Default, Debug)]
228+
pub struct Rotator<const COUNT: u32> {
229+
next: u32,
230+
}
231+
232+
impl<const COUNT: u32> Rotator<COUNT> {
233+
/// Rotates by one each [`Self::num_skip`] call up to COUNT - 1
234+
#[inline]
235+
pub fn num_skip(&mut self) -> u32 {
236+
let num_skip = self.next;
237+
self.next += 1;
238+
if self.next == COUNT {
239+
self.next = 0;
240+
}
241+
num_skip
242+
}
243+
}
244+
245+
/// [`Self::num_skip`] always returns 0.
246+
#[derive(Default, Debug)]
247+
pub struct BiasedRotator {}
248+
249+
impl BiasedRotator {
250+
/// Always returns 0.
251+
#[inline]
252+
pub fn num_skip(&mut self) -> u32 {
253+
0
254+
}
255+
}

tokio/src/macros/support.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ cfg_macros! {
33

44
pub use std::future::poll_fn;
55

6+
pub use crate::macros::join::{BiasedRotator, Rotator};
7+
68
#[doc(hidden)]
79
pub fn thread_rng_n(n: u32) -> u32 {
810
crate::runtime::context::thread_rng_n(n)

tokio/src/macros/try_join.rs

Lines changed: 72 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ macro_rules! doc {
1919
/// The supplied futures are stored inline and do not require allocating a
2020
/// `Vec`.
2121
///
22-
/// ### Runtime characteristics
22+
/// ## Runtime characteristics
2323
///
2424
/// By running all async expressions on the current task, the expressions are
2525
/// able to run **concurrently** but not in **parallel**. This means all
@@ -30,6 +30,25 @@ macro_rules! doc {
3030
///
3131
/// [`tokio::spawn`]: crate::spawn
3232
///
33+
/// ## Fairness
34+
///
35+
/// By default, `try_join!`'s generated future rotates which
36+
/// contained future is polled first whenever it is woken.
37+
///
38+
/// This behavior can be overridden by adding `biased;` to the beginning of the
39+
/// macro usage. See the examples for details. This will cause `try_join` to poll
40+
/// the futures in the order they appear from top to bottom.
41+
///
42+
/// You may want this if your futures may interact in a way where known polling order is significant.
43+
///
44+
/// But there is an important caveat to this mode. It becomes your responsibility
45+
/// to ensure that the polling order of your futures is fair. If for example you
46+
/// are joining a stream and a shutdown future, and the stream has a
47+
/// huge volume of messages that takes a long time to finish processing per poll, you should
48+
/// place the shutdown future earlier in the `try_join!` list to ensure that it is
49+
/// always polled, and will not be delayed due to the stream future taking a long time to return
50+
/// `Poll::Pending`.
51+
///
3352
/// # Examples
3453
///
3554
/// Basic `try_join` with two branches.
@@ -100,6 +119,37 @@ macro_rules! doc {
100119
/// }
101120
/// }
102121
/// ```
122+
/// Using the `biased;` mode to control polling order.
123+
///
124+
/// ```
125+
/// async fn do_stuff_async() -> Result<(), &'static str> {
126+
/// // async work
127+
/// # Ok(())
128+
/// }
129+
///
130+
/// async fn more_async_work() -> Result<(), &'static str> {
131+
/// // more here
132+
/// # Ok(())
133+
/// }
134+
///
135+
/// #[tokio::main]
136+
/// async fn main() {
137+
/// let res = tokio::try_join!(
138+
/// biased;
139+
/// do_stuff_async(),
140+
/// more_async_work()
141+
/// );
142+
///
143+
/// match res {
144+
/// Ok((first, second)) => {
145+
/// // do something with the values
146+
/// }
147+
/// Err(err) => {
148+
/// println!("processing failed; error = {}", err);
149+
/// }
150+
/// }
151+
/// }
152+
/// ```
103153
#[macro_export]
104154
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
105155
$try_join
@@ -108,12 +158,16 @@ macro_rules! doc {
108158

109159
#[cfg(doc)]
110160
doc! {macro_rules! try_join {
111-
($($future:expr),*) => { unimplemented!() }
161+
($(biased;)? $($future:expr),*) => { unimplemented!() }
112162
}}
113163

114164
#[cfg(not(doc))]
115165
doc! {macro_rules! try_join {
116166
(@ {
167+
// Type of rotator that controls which inner future to start with
168+
// when polling our output future.
169+
rotator=$rotator:ty;
170+
117171
// One `_` for each branch in the `try_join!` macro. This is not used once
118172
// normalization is complete.
119173
( $($count:tt)* )
@@ -142,25 +196,19 @@ doc! {macro_rules! try_join {
142196
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
143197
let mut futures = &mut futures;
144198

145-
// Each time the future created by poll_fn is polled, a different future will be polled first
146-
// to ensure every future passed to join! gets a chance to make progress even if
147-
// one of the futures consumes the whole budget.
148-
//
149-
// This is number of futures that will be skipped in the first loop
150-
// iteration the next time.
151-
let mut skip_next_time: u32 = 0;
199+
const COUNT: u32 = $($total)*;
152200

153-
poll_fn(move |cx| {
154-
const COUNT: u32 = $($total)*;
201+
// Each time the future created by poll_fn is polled, if not using biased mode,
202+
// a different future is polled first to ensure every future passed to try_join!
203+
// can make progress even if one of the futures consumes the whole budget.
204+
let mut rotator = <$rotator>::default();
155205

206+
poll_fn(move |cx| {
156207
let mut is_pending = false;
157-
158208
let mut to_run = COUNT;
159209

160-
// The number of futures that will be skipped in the first loop iteration
161-
let mut skip = skip_next_time;
162-
163-
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
210+
// The number of futures that will be skipped in the first loop iteration.
211+
let mut skip = rotator.num_skip();
164212

165213
// This loop runs twice and the first `skip` futures
166214
// are not polled in the first iteration.
@@ -216,15 +264,20 @@ doc! {macro_rules! try_join {
216264

217265
// ===== Normalize =====
218266

219-
(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
220-
$crate::try_join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
267+
(@ { rotator=$rotator:ty; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
268+
$crate::try_join!(@{ rotator=$rotator; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
221269
};
222270

223271
// ===== Entry point =====
272+
( biased; $($e:expr),+ $(,)?) => {
273+
$crate::try_join!(@{ rotator=$crate::macros::support::BiasedRotator; () (0) } $($e,)*)
274+
};
224275

225276
( $($e:expr),+ $(,)?) => {
226-
$crate::try_join!(@{ () (0) } $($e,)*)
277+
$crate::try_join!(@{ rotator=$crate::macros::support::Rotator<COUNT>; () (0) } $($e,)*)
227278
};
228279

280+
(biased;) => { async { Ok(()) }.await };
281+
229282
() => { async { Ok(()) }.await }
230283
}}

0 commit comments

Comments
 (0)