-
Notifications
You must be signed in to change notification settings - Fork 43
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Only a single CPU core is utilized when join-ing futures on tokio runtime. The issue does not arise when using async_std.
some stats:
# async_std:naive
Total duration: 2.3061 s
Rate: 433627.7135 reqs/s
# async_std:futures_concurrency_join
Total duration: 2.8707 s
Rate: 348346.6330 reqs/s
# tokio:naive
Total duration: 2.4885 s
Rate: 401842.6764 reqs/s
# tokio:futures_concurrency_join
Terminated after a few minutes. Utilization here is limited to a single CPU core.tokio test cases:
use std::time::{Duration, Instant};
use tokio::{spawn, time::sleep};
#[derive(Debug, Clone)]
struct Workload {
task_duration: Duration,
n_tasks: usize,
start: Instant,
}
#[tokio::main]
async fn main() {
let workload = Workload {
task_duration: Duration::from_millis(100),
n_tasks: 1000000,
start: Instant::now(),
};
// naive(workload.clone()).await;
futures_concurrency_join(workload.clone()).await;
let total_duration = (Instant::now() - workload.start).as_secs_f64();
let rate = workload.n_tasks as f64 / total_duration;
println!("Total duration: {:.4} s", total_duration);
println!("Rate: {:.4} reqs/s", rate);
}
async fn naive(workload: Workload) {
let tasks = (0..workload.n_tasks)
.map(|_| {
spawn({
async move {
sleep(workload.task_duration.clone()).await;
}
})
})
.collect::<Vec<_>>();
for task in tasks {
task.await.ok();
}
}
async fn futures_concurrency_join(workload: Workload) {
use futures_concurrency::prelude::*;
let tasks = (0..workload.n_tasks)
.map(|_| async move {
sleep(workload.task_duration.clone()).await;
})
.collect::<Vec<_>>();
tasks.join().await;
}async_std test cases:
use std::time::{Duration, Instant};
use async_std::task::{spawn, sleep};
#[derive(Debug, Clone)]
struct Workload {
task_duration: Duration,
n_tasks: usize,
start: Instant,
}
#[async_std::main]
async fn main() {
let workload = Workload {
task_duration: Duration::from_millis(100),
n_tasks: 1000000,
start: Instant::now(),
};
// naive(workload.clone()).await;
futures_concurrency_join(workload.clone()).await;
let total_duration = (Instant::now() - workload.start).as_secs_f64();
let rate = workload.n_tasks as f64 / total_duration;
println!("Total duration: {:.4} s", total_duration);
println!("Rate: {:.4} reqs/s", rate);
}
async fn naive(workload: Workload) {
let tasks = (0..workload.n_tasks)
.map(|_| {
spawn({
async move {
sleep(workload.task_duration.clone()).await;
}
})
})
.collect::<Vec<_>>();
for task in tasks {
task.await
}
}
async fn futures_concurrency_join(workload: Workload) {
use futures_concurrency::prelude::*;
let tasks = (0..workload.n_tasks)
.map(|_| async move {
sleep(workload.task_duration.clone()).await;
})
.collect::<Vec<_>>();
tasks.join().await;
}Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working