Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/buf/fixed/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ops::{Deref, DerefMut};
use std::rc::Rc;

// Data to construct a `FixedBuf` handle from.
pub(super) struct CheckedOutBuf {
pub(crate) struct CheckedOutBuf {
// Pointer and size of the buffer.
pub iovec: iovec,
// Length of the initialized part.
Expand Down
4 changes: 3 additions & 1 deletion src/buf/fixed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ pub use handle::FixedBuf;
mod buffers;
pub(crate) use buffers::FixedBuffers;

mod pool;
mod plumbing;

pub mod pool;
pub use pool::FixedBufPool;

mod registry;
Expand Down
8 changes: 8 additions & 0 deletions src/buf/fixed/plumbing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Internal data structures shared between thread-local and thread-safe
// fixed buffer collections.

mod pool;
pub(super) use pool::Pool;

mod registry;
pub(super) use registry::Registry;
186 changes: 186 additions & 0 deletions src/buf/fixed/plumbing/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers};

use libc::{iovec, UIO_MAXIOV};
use std::cmp;
use std::collections::HashMap;
use std::mem;
use std::ptr;
use std::slice;
use std::task::{Context, Poll, Waker};

// Internal state shared by FixedBufPool and FixedBuf handles.
pub(crate) struct Pool {
// Pointer to an allocated array of iovec records referencing
// the allocated buffers. The number of initialized records is the
// same as the length of the states array.
raw_bufs: ptr::NonNull<iovec>,
// Original capacity of raw_bufs as a Vec.
orig_cap: usize,
// State information on the buffers. Indices in this array correspond to
// the indices in the array at raw_bufs.
states: Vec<BufState>,
// Table of head indices of the free buffer lists in each size bucket.
free_buf_head_by_cap: HashMap<usize, u16>,
// Wakers for tasks pending on poll_next
waiting_on_next: Vec<Waker>,
}

// State information of a buffer in the registry,
enum BufState {
// The buffer is not in use.
Free {
// This field records the length of the initialized part.
init_len: usize,
// Index of the next buffer of the same capacity in a free buffer list, if any.
next: Option<u16>,
},
// The buffer is checked out.
// Its data are logically owned by the FixedBuf handle,
// which also keeps track of the length of the initialized part.
CheckedOut,
}

impl Pool {
pub(crate) fn new(bufs: impl Iterator<Item = Vec<u8>>) -> Self {
let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize));
let (size_hint, _) = bufs.size_hint();
let mut iovecs = Vec::with_capacity(size_hint);
let mut states = Vec::with_capacity(size_hint);
let mut free_buf_head_by_cap = HashMap::new();
for (index, mut buf) in bufs.enumerate() {
let cap = buf.capacity();

// Link the buffer as the head of the free list for its capacity.
// This constructs the free buffer list to be initially retrieved
// back to front, which should be of no difference to the user.
let next = free_buf_head_by_cap.insert(cap, index as u16);

iovecs.push(iovec {
iov_base: buf.as_mut_ptr() as *mut _,
iov_len: cap,
});
states.push(BufState::Free {
init_len: buf.len(),
next,
});
mem::forget(buf);
}
debug_assert_eq!(iovecs.len(), states.len());

// Safety: Vec::as_mut_ptr never returns null
let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) };
let orig_cap = iovecs.capacity();
mem::forget(iovecs);
Pool {
raw_bufs,
orig_cap,
states,
free_buf_head_by_cap,
waiting_on_next: vec![],
}
}

// If the free buffer list for this capacity is not empty, checks out the first buffer
// from the list and returns its data. Otherwise, returns None.
pub(crate) fn try_next(&mut self, cap: usize) -> Option<CheckedOutBuf> {
let free_head = self.free_buf_head_by_cap.get_mut(&cap)?;
let index = *free_head as usize;
let state = &mut self.states[index];

let (init_len, next) = match *state {
BufState::Free { init_len, next } => {
*state = BufState::CheckedOut;
(init_len, next)
}
BufState::CheckedOut => panic!("buffer is checked out"),
};

// Update the head of the free list for this capacity.
match next {
Some(i) => {
*free_head = i;
}
None => {
self.free_buf_head_by_cap.remove(&cap);
}
}

// Safety: the allocated array under the pointer is valid
// for the lifetime of self, a free buffer index is inside the array,
// as also asserted by the indexing operation on the states array
// that has the same length.
let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() };
debug_assert_eq!(iovec.iov_len, cap);
Some(CheckedOutBuf {
iovec,
init_len,
index: index as u16,
})
}

pub(crate) fn poll_next(&mut self, cap: usize, cx: &mut Context<'_>) -> Poll<CheckedOutBuf> {
if let Some(buf) = self.try_next(cap) {
return Poll::Ready(buf);
}
let waker = cx.waker();
if !self.waiting_on_next.iter().any(|w| w.will_wake(waker)) {
self.waiting_on_next.push(waker.clone());
}
Poll::Pending
}

fn check_in_internal(&mut self, index: u16, init_len: usize) {
let cap = self.iovecs()[index as usize].iov_len;
let state = &mut self.states[index as usize];
debug_assert!(
matches!(state, BufState::CheckedOut),
"the buffer must be checked out"
);

// Link the buffer as the new head of the free list for its capacity.
// Recently checked in buffers will be first to be reused,
// improving cache locality.
let next = self.free_buf_head_by_cap.insert(cap, index);

*state = BufState::Free { init_len, next };

if !self.waiting_on_next.is_empty() {
// Wake up tasks pending on poll_next
for waker in mem::take(&mut self.waiting_on_next) {
waker.wake()
}
}
}
}

impl FixedBuffers for Pool {
fn iovecs(&self) -> &[iovec] {
// Safety: the raw_bufs pointer is valid for the lifetime of self,
// the length of the states array is also the length of buffers array
// by construction.
unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) }
}

unsafe fn check_in(&mut self, index: u16, init_len: usize) {
self.check_in_internal(index, init_len)
}
}

impl Drop for Pool {
fn drop(&mut self) {
let iovecs = unsafe {
Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap)
};
for (i, iovec) in iovecs.iter().enumerate() {
match self.states[i] {
BufState::Free { init_len, next: _ } => {
let ptr = iovec.iov_base as *mut u8;
let cap = iovec.iov_len;
let v = unsafe { Vec::from_raw_parts(ptr, init_len, cap) };
mem::drop(v);
}
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
}
}
}
}
129 changes: 129 additions & 0 deletions src/buf/fixed/plumbing/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers};

use libc::{iovec, UIO_MAXIOV};
use std::cmp;
use std::mem;
use std::ptr;
use std::slice;

// Internal state shared by FixedBufRegistry and FixedBuf handles.
pub(crate) struct Registry {
// Pointer to an allocated array of iovec records referencing
// the allocated buffers. The number of initialized records is the
// same as the length of the states array.
raw_bufs: ptr::NonNull<iovec>,
// Original capacity of raw_bufs as a Vec.
orig_cap: usize,
// State information on the buffers. Indices in this array correspond to
// the indices in the array at raw_bufs.
states: Vec<BufState>,
}

// State information of a buffer in the registry,
enum BufState {
// The buffer is not in use.
// The field records the length of the initialized part.
Free { init_len: usize },
// The buffer is checked out.
// Its data are logically owned by the FixedBuf handle,
// which also keeps track of the length of the initialized part.
CheckedOut,
}

impl Registry {
pub(crate) fn new(bufs: impl Iterator<Item = Vec<u8>>) -> Self {
let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, u16::MAX as usize));
let (size_hint, _) = bufs.size_hint();
let mut iovecs = Vec::with_capacity(size_hint);
let mut states = Vec::with_capacity(size_hint);
for mut buf in bufs {
iovecs.push(iovec {
iov_base: buf.as_mut_ptr() as *mut _,
iov_len: buf.capacity(),
});
states.push(BufState::Free {
init_len: buf.len(),
});
mem::forget(buf);
}
debug_assert_eq!(iovecs.len(), states.len());

// Safety: Vec::as_mut_ptr never returns null
let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) };
let orig_cap = iovecs.capacity();
mem::forget(iovecs);
Registry {
raw_bufs,
orig_cap,
states,
}
}

// If the indexed buffer is free, changes its state to checked out
// and returns its data.
// If the buffer is already checked out, returns None.
pub(crate) fn check_out(&mut self, index: usize) -> Option<CheckedOutBuf> {
let state = self.states.get_mut(index)?;
let BufState::Free { init_len } = *state else {
return None
};

*state = BufState::CheckedOut;

// Safety: the allocated array under the pointer is valid
// for the lifetime of self, the index is inside the array
// as checked by Vec::get_mut above, called on the array of
// states that has the same length.
let iovec = unsafe { self.raw_bufs.as_ptr().add(index).read() };
debug_assert!(index <= u16::MAX as usize);
Some(CheckedOutBuf {
iovec,
init_len,
index: index as u16,
})
}

fn check_in_internal(&mut self, index: u16, init_len: usize) {
let state = self
.states
.get_mut(index as usize)
.expect("invalid buffer index");
debug_assert!(
matches!(state, BufState::CheckedOut),
"the buffer must be checked out"
);
*state = BufState::Free { init_len };
}
}

impl FixedBuffers for Registry {
fn iovecs(&self) -> &[iovec] {
// Safety: the raw_bufs pointer is valid for the lifetime of self,
// the length of the states array is also the length of buffers array
// by construction.
unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) }
}

unsafe fn check_in(&mut self, index: u16, init_len: usize) {
self.check_in_internal(index, init_len)
}
}

impl Drop for Registry {
fn drop(&mut self) {
let iovecs = unsafe {
Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap)
};
for (i, iovec) in iovecs.iter().enumerate() {
match self.states[i] {
BufState::Free { init_len } => {
let ptr = iovec.iov_base as *mut u8;
let cap = iovec.iov_len;
let v = unsafe { Vec::from_raw_parts(ptr, init_len, cap) };
mem::drop(v);
}
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
}
}
}
}
Loading