-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Allow consumers to implement POSIX AIO sources. #4054
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
bf4aed3
0536be4
5575468
4aa21a8
28691c5
13a2c11
42c92db
b2fc028
408a9c6
1f70110
eed4635
7e66ce2
a5e6f32
69e69be
4dfa047
668e725
3cfb77b
b5530e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| use crate::io::driver::{Handle, Interest, ReadyEvent, Registration}; | ||
| use mio::event::Source; | ||
| use mio::Registry; | ||
| use mio::Token; | ||
| use std::fmt; | ||
| use std::io; | ||
| use std::ops::{Deref, DerefMut}; | ||
| use std::os::unix::io::AsRawFd; | ||
| use std::os::unix::prelude::RawFd; | ||
| use std::task::{Context, Poll}; | ||
|
|
||
| /// Like [`mio::event::Source`], but for POSIX AIO only. | ||
| /// | ||
| /// Tokio's consumer must pass an implementor of this trait to create a | ||
| /// [`PollAio`] object. | ||
| pub trait AioSource { | ||
| /// Register this AIO event source with Tokio's reactor | ||
| fn register(&mut self, kq: RawFd, token: usize); | ||
|
|
||
| /// Deregister this AIO event source with Tokio's reactor | ||
| fn deregister(&mut self); | ||
| } | ||
|
|
||
| /// Wrap the user's AioSource in order to implement mio::event::Source, which | ||
| /// is what the rest of the crate wants. | ||
| struct MioSource<T>(T); | ||
|
|
||
| impl<T: AioSource> Source for MioSource<T> { | ||
| fn register( | ||
| &mut self, | ||
| registry: &Registry, | ||
| token: Token, | ||
| interests: mio::Interest, | ||
| ) -> io::Result<()> { | ||
| assert!(interests.is_aio() || interests.is_lio()); | ||
| self.0.register(registry.as_raw_fd(), usize::from(token)); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { | ||
| self.0.deregister(); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn reregister( | ||
| &mut self, | ||
| registry: &Registry, | ||
| token: Token, | ||
| interests: mio::Interest, | ||
| ) -> io::Result<()> { | ||
| assert!(interests.is_aio() || interests.is_lio()); | ||
| self.0.register(registry.as_raw_fd(), usize::from(token)); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| /// Associates a POSIX AIO control block with the reactor that drives it. | ||
| /// | ||
| /// `PollAio`'s wrapped type must implement [`AioSource`] to be driven | ||
| /// by the reactor. | ||
| /// | ||
| /// The wrapped source may be accessed through the `PollAio` via the `Deref` and | ||
| /// `DerefMut` traits. | ||
| /// | ||
| /// ## Clearing readiness | ||
| /// | ||
| /// If [`PollAio::poll`] returns ready, but the consumer determines that the | ||
| /// Source is not completely ready and must return to the Pending state, | ||
| /// [`PollAio::clear_ready`] may be used. This can be useful with | ||
| /// [`lio_listio`], which may generate a kevent when only a portion of the | ||
| /// operations have completed. | ||
asomers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// | ||
| /// ## Platforms | ||
| /// | ||
| /// Only FreeBSD implements POSIX AIO with kqueue notification, so | ||
| /// `PollAio` is only available for that operating system. | ||
Darksonn marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// | ||
| /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html | ||
| // Note: Unlike every other kqueue event source, POSIX AIO registers events not | ||
| // via kevent(2) but when the aiocb is submitted to the kernel via aio_read, | ||
| // aio_write, etc. It needs the kqueue's file descriptor to do that. So | ||
| // AsyncFd can't be used for POSIX AIO. | ||
| // | ||
| // Note that PollAio doesn't implement Drop. There's no need. Unlike other | ||
| // kqueue sources, there is nothing to deregister. | ||
asomers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| pub struct PollAio<E: AioSource> { | ||
asomers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| io: MioSource<E>, | ||
| registration: Registration, | ||
| } | ||
|
|
||
| // ===== impl PollAio ===== | ||
|
|
||
| impl<E: AioSource> PollAio<E> { | ||
| /// Indicates to Tokio that the source is no longer ready. The internal | ||
| /// readiness flag will be cleared, and tokio will wait for the next | ||
| /// edge-triggered readiness notification from the OS. | ||
| /// | ||
| /// It is critical that this function not be called unless your code | ||
| /// _actually observes_ that the source is _not_ ready. The OS must | ||
| /// deliver a subsequent notification, or this source will block | ||
| /// forever. | ||
| pub fn clear_ready(&self, ev: PollAioEvent) { | ||
| self.registration.clear_readiness(ev.0) | ||
| } | ||
|
|
||
| /// Destroy the [`PollAio`] and return its inner Source | ||
| pub fn into_inner(self) -> E { | ||
| self.io.0 | ||
| } | ||
|
|
||
| /// Creates a new `PollAio` suitable for use with POSIX AIO functions. | ||
| /// | ||
| /// It will be associated with the default reactor. The runtime is usually | ||
| /// set implicitly when this function is called from a future driven by a | ||
| /// tokio runtime, otherwise runtime can be set explicitly with | ||
| /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. | ||
| pub fn new_for_aio(io: E) -> io::Result<Self> { | ||
| Self::new_with_interest(io, Interest::AIO) | ||
| } | ||
|
|
||
| /// Creates a new `PollAio` suitable for use with [`lio_listio`]. | ||
| /// | ||
| /// It will be associated with the default reactor. The runtime is usually | ||
| /// set implicitly when this function is called from a future driven by a | ||
| /// tokio runtime, otherwise runtime can be set explicitly with | ||
| /// [`Runtime::enter`](crate::runtime::Runtime::enter) function. | ||
| /// | ||
| /// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html | ||
| pub fn new_for_lio(io: E) -> io::Result<Self> { | ||
| Self::new_with_interest(io, Interest::LIO) | ||
| } | ||
|
|
||
| fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { | ||
| let mut io = MioSource(io); | ||
| let handle = Handle::current(); | ||
| let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; | ||
| Ok(Self { io, registration }) | ||
| } | ||
|
|
||
| /// Polls for readiness. Either AIO or LIO counts. | ||
asomers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<PollAioEvent>> { | ||
|
||
| let ev = ready!(self.registration.poll_read_ready(cx))?; | ||
| Poll::Ready(Ok(PollAioEvent(ev))) | ||
| } | ||
| } | ||
|
|
||
| impl<E: AioSource> Deref for PollAio<E> { | ||
| type Target = E; | ||
|
|
||
| fn deref(&self) -> &E { | ||
| &self.io.0 | ||
| } | ||
| } | ||
|
|
||
| impl<E: AioSource> DerefMut for PollAio<E> { | ||
| fn deref_mut(&mut self) -> &mut E { | ||
| &mut self.io.0 | ||
| } | ||
| } | ||
|
|
||
| impl<E: AioSource + fmt::Debug> fmt::Debug for PollAio<E> { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| f.debug_struct("PollAio").field("io", &self.io.0).finish() | ||
| } | ||
| } | ||
|
|
||
| /// Opaque data returned by [`PollAio::poll`]. | ||
| /// | ||
| /// It can be fed back to [`PollAio::clear_ready`]. | ||
| #[derive(Debug)] | ||
| pub struct PollAioEvent(ReadyEvent); | ||
asomers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
Uh oh!
There was an error while loading. Please reload this page.