Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions examples/wrk-bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::io;
use std::rc::Rc;
use tokio::task::JoinHandle;

pub const RESPONSE: &'static [u8] =
b"HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: 12\n\nHello world!";

pub const ADDRESS: &'static str = "127.0.0.1:8080";

fn main() -> io::Result<()> {
tokio_uring::start(async {
let mut tasks = Vec::with_capacity(16);
let listener = Rc::new(tokio_uring::net::TcpListener::bind(
ADDRESS.parse().unwrap(),
)?);

for _ in 0..16 {
let listener = listener.clone();
let task: JoinHandle<io::Result<()>> = tokio::task::spawn_local(async move {
loop {
let (stream, _) = listener.accept().await?;

tokio_uring::spawn(async move {
let (result, _) = stream.write(RESPONSE).await;

if let Err(err) = result {
eprintln!("Client connection failed: {}", err);
}
});
}
});
tasks.push(task);
}

for t in tasks {
t.await.unwrap()?;
}

Ok(())
})
}
8 changes: 4 additions & 4 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ pub(crate) struct Driver {

type Handle = Rc<RefCell<Inner>>;

struct Inner {
pub(crate) struct Inner {
/// In-flight operations
ops: Ops,

/// IoUring bindings
uring: IoUring,
pub(crate) uring: IoUring,
}

// When dropping the driver, all in-flight operations must have completed. This
// type wraps the slab and ensures that, on drop, the slab is empty.
struct Ops(Slab<op::Lifecycle>);

scoped_thread_local!(static CURRENT: Rc<RefCell<Inner>>);
scoped_thread_local!(pub(crate) static CURRENT: Rc<RefCell<Inner>>);

impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Inner {
}
}

fn submit(&mut self) -> io::Result<()> {
pub(crate) fn submit(&mut self) -> io::Result<()> {
loop {
match self.uring.submit() {
Ok(_) => {
Expand Down
6 changes: 0 additions & 6 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ impl<T> Op<T> {
}
}

// Submit the new operation. At this point, the operation has been
// pushed onto the queue and the tail pointer has been updated, so
// the submission entry is visible to the kernel. If there is an
// error here (probably EAGAIN), we still return the operation. A
// future `io_uring_enter` will fully submit the event.
let _ = inner.submit();
Ok(op)
})
}
Expand Down
8 changes: 7 additions & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::driver::Driver;
use crate::driver::{Driver, CURRENT};
use std::cell::RefCell;

use std::future::Future;
use std::io;
Expand Down Expand Up @@ -51,6 +52,11 @@ pub fn spawn<T: std::future::Future + 'static>(task: T) -> tokio::task::JoinHand
impl Runtime {
pub(crate) fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.on_thread_park(|| {
CURRENT.with(|x| {
let _ = RefCell::borrow_mut(x).uring.submit();
});
})
.enable_all()
.build()?;

Expand Down