Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions foyer-memory/src/eviction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@

use std::{fmt::Debug, ptr::NonNull};

use foyer_intrusive::{
dlist::{Dlist, DlistLink},
intrusive_adapter,
};
use foyer_common::slab::{slab_linked_list::SlabLinkedList, Token};
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -45,7 +42,7 @@ pub struct FifoHandle<T>
where
T: Send + Sync + 'static,
{
link: DlistLink,
token: Option<Token>,
base: BaseHandle<T, FifoContext>,
}

Expand All @@ -58,15 +55,13 @@ where
}
}

intrusive_adapter! { FifoHandleDlistAdapter<T> = NonNull<FifoHandle<T>>: FifoHandle<T> { link: DlistLink } where T: Send + Sync + 'static }

impl<T> Default for FifoHandle<T>
where
T: Send + Sync + 'static,
{
fn default() -> Self {
Self {
link: DlistLink::default(),
token: None,
base: BaseHandle::new(),
}
}
Expand Down Expand Up @@ -95,7 +90,7 @@ pub struct Fifo<T>
where
T: Send + Sync + 'static,
{
queue: Dlist<FifoHandleDlistAdapter<T>>,
queue: SlabLinkedList<NonNull<FifoHandle<T>>>,
}

impl<T> Eviction for Fifo<T>
Expand All @@ -109,17 +104,23 @@ where
where
Self: Sized,
{
Self { queue: Dlist::new() }
Self {
queue: SlabLinkedList::new(),
}
}

unsafe fn push(&mut self, mut ptr: NonNull<Self::Handle>) {
self.queue.push_back(ptr);
ptr.as_mut().base_mut().set_in_eviction(true);
let token = self.queue.push_back(ptr);
let handle = ptr.as_mut();
handle.base_mut().set_in_eviction(true);
handle.token = Some(token);
}

unsafe fn pop(&mut self) -> Option<NonNull<Self::Handle>> {
self.queue.pop_front().map(|mut ptr| {
ptr.as_mut().base_mut().set_in_eviction(false);
let handle = ptr.as_mut();
handle.base_mut().set_in_eviction(false);
handle.token = None;
ptr
})
}
Expand All @@ -129,15 +130,19 @@ where
unsafe fn acquire(&mut self, _: NonNull<Self::Handle>) {}

unsafe fn remove(&mut self, mut ptr: NonNull<Self::Handle>) {
let p = self.queue.iter_mut_from_raw(ptr.as_mut().link.raw()).remove().unwrap();
let p = self.queue.remove_with_token(ptr.as_ref().token.unwrap_unchecked());
assert_eq!(p, ptr);
ptr.as_mut().base_mut().set_in_eviction(false);
let handle = ptr.as_mut();
handle.base_mut().set_in_eviction(false);
handle.token = None;
}

unsafe fn clear(&mut self) -> Vec<NonNull<Self::Handle>> {
let mut res = Vec::with_capacity(self.len());
while let Some(mut ptr) = self.queue.pop_front() {
ptr.as_mut().base_mut().set_in_eviction(false);
let handle = ptr.as_mut();
handle.base_mut().set_in_eviction(false);
handle.token = None;
res.push(ptr);
}
res
Expand Down Expand Up @@ -168,10 +173,12 @@ pub mod tests {
T: Send + Sync + 'static + Clone,
{
fn dump(&self) -> Vec<T> {
self.queue
.iter()
.map(|handle| handle.base().data_unwrap_unchecked().clone())
.collect_vec()
unsafe {
self.queue
.iter()
.map(|handle| handle.as_ref().base().data_unwrap_unchecked().clone())
.collect_vec()
}
}
}

Expand Down
93 changes: 49 additions & 44 deletions foyer-memory/src/eviction/lfu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
use std::{fmt::Debug, ptr::NonNull};

use cmsketch::CMSketchU16;
use foyer_intrusive::{
core::adapter::Link,
dlist::{Dlist, DlistLink},
intrusive_adapter,
};
use foyer_common::slab::{slab_linked_list::SlabLinkedList, Token};
use serde::{Deserialize, Serialize};

use crate::{
Expand Down Expand Up @@ -85,7 +81,7 @@ pub struct LfuHandle<T>
where
T: Send + Sync + 'static,
{
link: DlistLink,
token: Option<Token>,
base: BaseHandle<T, LfuContext>,
queue: Queue,
}
Expand All @@ -99,15 +95,13 @@ where
}
}

intrusive_adapter! { LfuHandleDlistAdapter<T> = NonNull<LfuHandle<T>>: LfuHandle<T> { link: DlistLink } where T: Send + Sync + 'static }

impl<T> Default for LfuHandle<T>
where
T: Send + Sync + 'static,
{
fn default() -> Self {
Self {
link: DlistLink::default(),
token: None,
base: BaseHandle::new(),
queue: Queue::None,
}
Expand Down Expand Up @@ -149,9 +143,9 @@ pub struct Lfu<T>
where
T: Send + Sync + 'static,
{
window: Dlist<LfuHandleDlistAdapter<T>>,
probation: Dlist<LfuHandleDlistAdapter<T>>,
protected: Dlist<LfuHandleDlistAdapter<T>>,
window: SlabLinkedList<NonNull<LfuHandle<T>>>,
probation: SlabLinkedList<NonNull<LfuHandle<T>>>,
protected: SlabLinkedList<NonNull<LfuHandle<T>>>,

window_weight: usize,
probation_weight: usize,
Expand Down Expand Up @@ -235,9 +229,9 @@ where
let decay = frequencies.width();

Self {
window: Dlist::new(),
probation: Dlist::new(),
protected: Dlist::new(),
window: SlabLinkedList::with_capacity(capacity),
probation: SlabLinkedList::with_capacity(capacity),
protected: SlabLinkedList::with_capacity(capacity),
window_weight: 0,
probation_weight: 0,
protected_weight: 0,
Expand All @@ -252,12 +246,13 @@ where
unsafe fn push(&mut self, mut ptr: NonNull<Self::Handle>) {
let handle = ptr.as_mut();

debug_assert!(!handle.link.is_linked());
debug_assert!(handle.token.is_none());
debug_assert!(!handle.base().is_in_eviction());
debug_assert_eq!(handle.queue, Queue::None);

self.window.push_back(ptr);
let token = self.window.push_back(ptr);
handle.base_mut().set_in_eviction(true);
handle.token = Some(token);
handle.queue = Queue::Window;

self.increase_queue_weight(handle);
Expand All @@ -271,7 +266,8 @@ where
self.decrease_queue_weight(handle);
handle.queue = Queue::Probation;
self.increase_queue_weight(handle);
self.probation.push_back(ptr);
let token = self.probation.push_back(ptr);
handle.token = Some(token);
}
}

Expand All @@ -283,7 +279,8 @@ where
(None, Some(_)) => self.probation.pop_front(),
(Some(_), None) => self.window.pop_front(),
(Some(window), Some(probation)) => {
if self.frequencies.estimate(window.base().hash()) < self.frequencies.estimate(probation.base().hash())
if self.frequencies.estimate(window.as_ref().base().hash())
< self.frequencies.estimate(probation.as_ref().base().hash())
{
self.window.pop_front()

Expand All @@ -298,13 +295,14 @@ where

let handle = ptr.as_mut();

debug_assert!(!handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
debug_assert_ne!(handle.queue, Queue::None);

self.decrease_queue_weight(handle);
handle.queue = Queue::None;
handle.base_mut().set_in_eviction(false);
handle.token = None;

Some(ptr)
}
Expand All @@ -314,28 +312,30 @@ where

match handle.queue {
Queue::None => {
debug_assert!(!handle.link.is_linked());
debug_assert!(handle.token.is_none());
debug_assert!(!handle.base().is_in_eviction());
self.push(ptr);
debug_assert!(handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
}
Queue::Window => {
// Move to MRU position of `window`.
debug_assert!(handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
self.window.remove_raw(handle.link.raw());
self.window.push_back(ptr);
self.window.remove_with_token(handle.token.unwrap_unchecked());
let token = self.window.push_back(ptr);
handle.token = Some(token);
}
Queue::Probation => {
// Promote to MRU position of `protected`.
debug_assert!(handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
self.probation.remove_raw(handle.link.raw());
self.probation.remove_with_token(handle.token.unwrap_unchecked());
self.decrease_queue_weight(handle);
handle.queue = Queue::Protected;
self.increase_queue_weight(handle);
self.protected.push_back(ptr);
let token = self.protected.push_back(ptr);
handle.token = Some(token);

// If `protected` weight exceeds the capacity, overflow entry from `protected` to `probation`.
while self.protected_weight > self.protected_weight_capacity {
Expand All @@ -345,15 +345,17 @@ where
self.decrease_queue_weight(handle);
handle.queue = Queue::Probation;
self.increase_queue_weight(handle);
self.probation.push_back(ptr);
let token = self.probation.push_back(ptr);
handle.token = Some(token);
}
}
Queue::Protected => {
// Move to MRU position of `protected`.
debug_assert!(handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
self.protected.remove_raw(handle.link.raw());
self.protected.push_back(ptr);
self.protected.remove_with_token(handle.token.unwrap_unchecked());
let token = self.protected.push_back(ptr);
handle.token = Some(token);
}
}
}
Expand All @@ -365,22 +367,23 @@ where
unsafe fn remove(&mut self, mut ptr: NonNull<Self::Handle>) {
let handle = ptr.as_mut();

debug_assert!(handle.link.is_linked());
debug_assert!(handle.token.is_some());
debug_assert!(handle.base().is_in_eviction());
debug_assert_ne!(handle.queue, Queue::None);

match handle.queue {
Queue::None => unreachable!(),
Queue::Window => self.window.remove_raw(handle.link.raw()),
Queue::Probation => self.probation.remove_raw(handle.link.raw()),
Queue::Protected => self.protected.remove_raw(handle.link.raw()),
Queue::Window => self.window.remove_with_token(handle.token.unwrap_unchecked()),
Queue::Probation => self.probation.remove_with_token(handle.token.unwrap_unchecked()),
Queue::Protected => self.protected.remove_with_token(handle.token.unwrap_unchecked()),
};

debug_assert!(!handle.link.is_linked());
debug_assert!(handle.token.is_none());

self.decrease_queue_weight(handle);
handle.queue = Queue::None;
handle.base_mut().set_in_eviction(false);
handle.token = None;
}

unsafe fn clear(&mut self) -> Vec<NonNull<Self::Handle>> {
Expand All @@ -389,7 +392,7 @@ where
while !self.is_empty() {
let ptr = self.pop().unwrap_unchecked();
debug_assert!(!ptr.as_ref().base().is_in_eviction());
debug_assert!(!ptr.as_ref().link.is_linked());
debug_assert!(ptr.as_ref().token.is_none());
debug_assert_eq!(ptr.as_ref().queue, Queue::None);
res.push(ptr);
}
Expand Down Expand Up @@ -422,12 +425,14 @@ mod tests {
T: Send + Sync + 'static + Clone,
{
fn dump(&self) -> Vec<T> {
self.window
.iter()
.chain(self.probation.iter())
.chain(self.protected.iter())
.map(|handle| handle.base().data_unwrap_unchecked().clone())
.collect_vec()
unsafe {
self.window
.iter()
.chain(self.probation.iter())
.chain(self.protected.iter())
.map(|handle| handle.as_ref().base().data_unwrap_unchecked().clone())
.collect_vec()
}
}
}

Expand Down