Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,10 @@ name = "time_timeout"
path = "time_timeout.rs"
harness = false

[[bench]]
name = "time_drop_sleep_contention"
path = "time_drop_sleep_contention.rs"
harness = false

[lints]
workspace = true
104 changes: 104 additions & 0 deletions benches/time_drop_sleep_contention.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/// Benchmark demonstrating timer mutex contention on drop (Issue #6504)
///
/// This benchmark creates many timers, polls them once to initialize and register
/// them with the timer wheel, then drops them before they fire. This is the common
/// case for timeouts that are set but don't fire.
///
/// Each drop acquires the global timer mutex to deregister from the wheel, causing
/// severe contention under concurrent load.
///
/// ## Baseline Results (Pre-Fix)
///
/// ```text
/// timer_drop_single_thread_10k: 33.3 ms (32.7-34.0 ms)
/// timer_drop_multi_thread_10k_8workers: 21.6 ms (19.1-24.7 ms)
/// ```
///
/// **Analysis**: Multi-threaded (8 workers) is only 1.54x faster than single-threaded,
/// demonstrating severe mutex contention.

use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::time::Instant;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::{
runtime::Runtime,
time::{sleep, Duration},
};

fn build_runtime(workers: usize) -> Runtime {
if workers == 1 {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap()
}
}

async fn create_and_drop_timers(count: usize, workers: usize) {
let handles: Vec<_> = (0..workers)
.map(|_| {
tokio::spawn(async move {
for _ in 0..count / workers {
let mut sleep = Box::pin(sleep(Duration::from_secs(60)));

// Poll once to initialize and register without awaiting
poll_fn(|cx| {
let _ = sleep.as_mut().poll(cx);
std::task::Poll::Ready(())
})
.await;

black_box(drop(sleep));
}
})
})
.collect();

for handle in handles {
handle.await.unwrap();
}
}

fn timer_drop_contention_single_thread(c: &mut Criterion) {
let runtime = build_runtime(1);

c.bench_function("timer_drop_single_thread_10k", |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(create_and_drop_timers(10_000 * iters as usize, 1)).await;
});
start.elapsed()
})
});
}

fn timer_drop_contention_multi_thread(c: &mut Criterion) {
let runtime = build_runtime(8);

c.bench_function("timer_drop_multi_thread_10k_8workers", |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(create_and_drop_timers(10_000 * iters as usize, 8)).await;
});
start.elapsed()
})
});
}

criterion_group!(
timer_contention,
timer_drop_contention_single_thread,
timer_drop_contention_multi_thread
);

criterion_main!(timer_contention);
10 changes: 10 additions & 0 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ cfg_rt! {
.unwrap_or_else(|_| (f.take().unwrap())(None))
}

/// Attempts to register a timer with the current worker's local timer map.
/// Returns true if registered, false if no multi-threaded worker core available.
#[cfg(all(feature = "rt", feature = "rt-multi-thread"))]
pub(crate) fn try_register_timer(deadline: crate::time::Instant, waker: std::task::Waker) -> bool {
with_scheduler(|ctx| match ctx {
Some(scheduler::Context::MultiThread(ctx)) => ctx.register_timer(deadline.into(), waker),
_ => false,
})
}

cfg_taskdump! {
/// SAFETY: Callers of this function must ensure that trace frames always
/// form a valid linked list.
Expand Down
52 changes: 51 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::collections::HashMap;
use std::task::Waker;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

mod metrics;

Expand Down Expand Up @@ -139,6 +140,10 @@ struct Core {

/// Fast random number generator.
rand: FastRand,

/// Per-worker timers: lock-free HashMap for timer registration
/// Maps deadline -> wakers to fire at that time
timers: HashMap<Instant, Vec<Waker>>,
}

/// State shared across all workers
Expand Down Expand Up @@ -267,6 +272,7 @@ pub(super) fn create(
global_queue_interval: stats.tuned_global_queue_interval(&config),
stats,
rand: FastRand::from_seed(config.seed_generator.next_seed()),
timers: HashMap::new(),
}));

remotes.push(Remote { steal, unpark });
Expand Down Expand Up @@ -534,6 +540,9 @@ impl Context {
// Run maintenance, if needed
core = self.maintenance(core);

// Fire any expired timers (lock-free, per-worker)
core.fire_expired_timers(Instant::now());

// First, check work available to the current worker.
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
Expand Down Expand Up @@ -793,6 +802,21 @@ impl Context {
self.defer.defer(waker);
}
}

/// Register a timer with the current worker's local timer HashMap.
///
/// This is called from TimerEntry when a timer needs to be registered.
/// Returns true if successfully registered, false if no core is available
/// (e.g., during block_in_place).
pub(crate) fn register_timer(&self, deadline: Instant, waker: Waker) -> bool {
let mut core = self.core.borrow_mut();
if let Some(core) = core.as_mut() {
core.register_timer(deadline, waker);
true
} else {
false
}
}
}

impl Core {
Expand Down Expand Up @@ -1042,6 +1066,32 @@ impl Core {
self.global_queue_interval = next;
}
}

/// Fire all timers that have expired by the given instant.
///
/// This scans the per-worker timer HashMap and wakes all tasks whose
/// deadlines have passed. Unlike the global timer wheel, this is lock-free
/// and doesn't require any synchronization.
fn fire_expired_timers(&mut self, now: Instant) {
self.timers.retain(|&deadline, wakers| {
(now < deadline) || {
wakers.drain(..).for_each(Waker::wake);
false
}
});
Comment on lines +1075 to +1081
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very expensive operation.

}

/// Register a timer waker at the given deadline.
///
/// This is called from TimerEntry::poll_elapsed when a timer is registered.
/// The waker will be fired when fire_expired_timers() is called with a time
/// >= deadline.
pub(crate) fn register_timer(&mut self, deadline: Instant, waker: Waker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from issues of O(n) time complexity, how to de-register a timer?

self.timers
.entry(deadline)
.or_insert_with(Vec::new)
.push(waker);
}
}

impl Worker {
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,18 @@ impl TimerEntry {
);

if !self.registered {
#[cfg(feature = "rt-multi-thread")]
{
// Try worker-local registration first (lock-free on multi-threaded runtime)
if crate::runtime::context::try_register_timer(self.deadline, cx.waker().clone()) {
// Successfully registered with worker-local timers
let this = self.as_mut().project();
*this.registered = true;
return Poll::Pending;
}
}

// Fall back to global timer wheel (current_thread runtime or block_in_place)
let deadline = self.deadline;
self.as_mut().reset(deadline, true);
}
Expand Down
Loading