Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["futures", "checkpoint"] }

Expand Down
10 changes: 10 additions & 0 deletions tokio/src/io/driver/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ use std::ops;
pub struct Interest(mio::Interest);

impl Interest {
cfg_aio! {
/// Interest for POSIX AIO
pub const AIO: Interest = Interest(mio::Interest::AIO);
}

cfg_aio! {
/// Interest for POSIX AIO lio_listio events
pub const LIO: Interest = Interest(mio::Interest::LIO);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/io/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

#[cfg(all(target_os = "freebsd", feature = "net"))]
{
if event.is_aio() {
ready |= Ready::READABLE;
}

if event.is_lio() {
ready |= Ready::READABLE;
}
}

if event.is_readable() {
ready |= Ready::READABLE;
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ cfg_io_driver_impl! {
pub use driver::{Interest, Ready};
}

cfg_aio! {
mod poll_aio;
pub use poll_aio::{AioSource, PollAio, PollAioEvent};
}

mod poll_evented;

#[cfg(not(loom))]
Expand Down
193 changes: 193 additions & 0 deletions tokio/src/io/poll_aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
use mio::Registry;
use mio::Token;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::task::{Context, Poll};

/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`PollAio`] object.
pub trait AioSource {
/// Register this AIO event source with Tokio's reactor
fn register(&mut self, kq: RawFd, token: usize);

/// Deregister this AIO event source with Tokio's reactor
fn deregister(&mut self);
}

/// Wrap the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);

impl<T: AioSource> Source for MioSource<T> {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}

fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
self.0.deregister();
Ok(())
}

fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}
}

/// Associates a POSIX AIO control block with the reactor that drives it.
///
/// `PollAio`'s wrapped type must implement [`AioSource`] to be driven
/// by the reactor.
///
/// The wrapped source may be accessed through the `PollAio` via the `Deref` and
/// `DerefMut` traits.
///
/// ## Clearing readiness
///
/// If [`PollAio::poll`] returns ready, but the consumer determines that the
/// Source is not completely ready and must return to the Pending state,
/// [`PollAio::clear_ready`] may be used. This can be useful with
/// [`lio_listio`], which may generate a kevent when only a portion of the
/// operations have completed.
///
/// ## Platforms
///
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
/// `PollAio` is only available for that operating system.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
// AsyncFd can't be used for POSIX AIO.
//
// Note that PollAio doesn't implement Drop. There's no need. Unlike other
// kqueue sources, simply dropping the object effectively deregisters it.
pub struct PollAio<E: AioSource> {
io: MioSource<E>,
registration: Registration,
}

// ===== impl PollAio =====

impl<E: AioSource> PollAio<E> {
/// Indicates to Tokio that the source is no longer ready. The internal
/// readiness flag will be cleared, and tokio will wait for the next
/// edge-triggered readiness notification from the OS.
///
/// It is critical that this method not be called unless your code
/// _actually observes_ that the source is _not_ ready. The OS must
/// deliver a subsequent notification, or this source will block
/// forever. It is equally critical that you `do` call this method if you
/// resubmit the same structure to the kernel and poll it again.
///
/// This method is not very useful with AIO readiness, since each `aiocb`
/// structure is typically only used once. It's main use with
/// [`lio_listio`], which will sometimes send notification when only a
/// portion of its elements are complete. In that case, the caller must
/// call `clear_ready` before resubmitting it.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn clear_ready(&self, ev: PollAioEvent) {
self.registration.clear_readiness(ev.0)
}

/// Destroy the [`PollAio`] and return its inner Source
pub fn into_inner(self) -> E {
self.io.0
}

/// Creates a new `PollAio` suitable for use with POSIX AIO functions.
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_for_aio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::AIO)
}

/// Creates a new `PollAio` suitable for use with [`lio_listio`].
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn new_for_lio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::LIO)
}

fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
let handle = Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}

/// Polls for readiness. Either AIO or LIO counts.
///
/// This method returns:
/// * `Poll::Pending` if the underlying operation is not complete, whether
/// or not it completed successfully. This will be true if the OS is
/// still processing it, or if it has not yet been submitted to the OS.
/// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
/// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
/// _not_ indicate that the underlying operation encountered an error.
///
/// When the method returns Poll::Pending, the Waker in the provided Context
/// is scheduled to receive a wakeup when the underlying operation
/// completes. Note that on multiple calls to poll, only the Waker from the
/// Context passed to the most recent call is scheduled to receive a wakeup.
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<PollAioEvent>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid inherent fns named just poll as they would conflict w/ Future. Also, why is this an inherent fn? If it were implemented as a future, then it could be used with .await.

Perhaps, the inherent fn could be named poll_ready (or something) and an equivalent async fn is added async fn ready(&self) -> io::Result<AioEvent>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the principle that "futures should do nothing until polled", I elected to submit the operation to the kernel (aio_write, etc) as part of poll. But that means that it needs to be in the external crate instead of in Tokio. Hence the inherent function. I'll rename it to poll_ready.

let ev = ready!(self.registration.poll_read_ready(cx))?;
Poll::Ready(Ok(PollAioEvent(ev)))
}
}

impl<E: AioSource> Deref for PollAio<E> {
type Target = E;

fn deref(&self) -> &E {
&self.io.0
}
}

impl<E: AioSource> DerefMut for PollAio<E> {
fn deref_mut(&mut self) -> &mut E {
&mut self.io.0
}
}

impl<E: AioSource + fmt::Debug> fmt::Debug for PollAio<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollAio").field("io", &self.io.0).finish()
}
}

/// Opaque data returned by [`PollAio::poll`].
///
/// It can be fed back to [`PollAio::clear_ready`].
#[derive(Debug)]
pub struct PollAioEvent(ReadyEvent);
5 changes: 3 additions & 2 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
//! as well as (on Unix-like systems) `AsyncFd`
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and
//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on
//! FreeBSD) `PollAio`.
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
}
}

macro_rules! cfg_aio {
($($item:item)*) => {
$(
#[cfg(all(target_os = "freebsd", feature = "net"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(all(target_os = "freebsd", feature = "net"))]
#[cfg(any(docsrs, all(target_os = "freebsd", feature = "net")))]

#[cfg_attr(docsrs,
doc(cfg(all(target_os = "freebsd", feature = "net")))
)]
$item
)*
}
}

macro_rules! cfg_fs {
($($item:item)*) => {
$(
Expand Down
Loading