Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/buf/fixed/plumbing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::HashMap;
use std::mem;
use std::ptr;
use std::slice;
use std::task::{Context, Poll, Waker};

// Internal state shared by FixedBufPool and FixedBuf handles.
pub(crate) struct Pool {
Expand All @@ -20,6 +21,8 @@ pub(crate) struct Pool {
states: Vec<BufState>,
// Table of head indices of the free buffer lists in each size bucket.
free_buf_head_by_cap: HashMap<usize, u16>,
// Wakers for tasks pending on poll_next
waiting_on_next: Vec<Waker>,
}

// State information of a buffer in the registry,
Expand Down Expand Up @@ -73,6 +76,7 @@ impl Pool {
orig_cap,
states,
free_buf_head_by_cap,
waiting_on_next: vec![],
}
}

Expand Down Expand Up @@ -114,6 +118,17 @@ impl Pool {
})
}

pub(crate) fn poll_next(&mut self, cap: usize, cx: &mut Context<'_>) -> Poll<CheckedOutBuf> {
if let Some(buf) = self.try_next(cap) {
return Poll::Ready(buf);
}
let waker = cx.waker();
if !self.waiting_on_next.iter().any(|w| w.will_wake(waker)) {
self.waiting_on_next.push(waker.clone());
}
Poll::Pending
}

fn check_in_internal(&mut self, index: u16, init_len: usize) {
let cap = self.iovecs()[index as usize].iov_len;
let state = &mut self.states[index as usize];
Expand All @@ -128,6 +143,13 @@ impl Pool {
let next = self.free_buf_head_by_cap.insert(cap, index);

*state = BufState::Free { init_len, next };

if !self.waiting_on_next.is_empty() {
// Wake up tasks pending on poll_next
for waker in mem::take(&mut self.waiting_on_next) {
waker.wake()
}
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/buf/fixed/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::runtime::CONTEXT;
use std::cell::RefCell;
use std::io;
use std::rc::Rc;
use std::task::{Context, Poll};

/// A dynamic collection of I/O buffers pre-registered with the kernel.
///
Expand Down Expand Up @@ -223,4 +224,19 @@ impl FixedBufPool {
unsafe { FixedBuf::new(registry, data) }
})
}

/// Attempts to obtain a buffer of requested capacity from this pool
/// that is not currently owned by any other [`FixedBuf`] handle.
/// Registers the current task for wakeup if no such free buffer is
/// available. The task is woken up when a `FixedBuf` handle belonging
/// to this pool is dropped.
pub fn poll_next(&mut self, cap: usize, cx: &mut Context<'_>) -> Poll<FixedBuf> {
let mut inner = self.inner.borrow_mut();
let data = ready!(inner.poll_next(cap, cx));
let registry = Rc::clone(&self.inner);
// Safety: the validity of buffer data is ensured by
// plumbing::Pool::poll_next
let buf = unsafe { FixedBuf::new(registry, data) };
Poll::Ready(buf)
}
}