Skip to content

Commit 42f9a1e

Browse files
committed
Add sanitizer builds.
1 parent 5917e7b commit 42f9a1e

File tree

25 files changed

+368
-364
lines changed

25 files changed

+368
-364
lines changed

ringolo/src/context/core.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -122,25 +122,6 @@ impl Core {
122122
self.pending_ios.fetch_sub(1, Ordering::Release);
123123
}
124124

125-
pub(crate) fn increment_task_pending_ios(&self, waker: &Waker) {
126-
self.modify_task_pending_ios(waker, 1);
127-
}
128-
129-
pub(crate) fn decrement_task_pending_ios(&self, waker: &Waker) {
130-
self.modify_task_pending_ios(waker, -1);
131-
}
132-
133-
fn modify_task_pending_ios(&self, waker: &Waker, delta: i32) {
134-
// Don't track owned_resources for the root_future. The reason is because it
135-
// *is not backed* by a regular task and is never stealable by other threads.
136-
if !self.is_polling_root() {
137-
unsafe {
138-
let ptr = NonNull::new_unchecked(waker.data() as *mut Header);
139-
Header::modify_pending_ios(ptr, delta);
140-
}
141-
}
142-
}
143-
144125
pub(crate) fn increment_task_owned_resources(&self, waker: &Waker) -> Option<NonNull<Header>> {
145126
// Don't track owned_resources for the root_future. The reason is because it
146127
// *is not backed* by a regular task and is never stealable by other threads.
@@ -159,11 +140,8 @@ impl Core {
159140
#[cfg(test)]
160141
mod tests {
161142
use crate as ringolo;
162-
use crate::context::{self, PendingIoOp, init_stealing_context};
163-
use crate::runtime::runtime::Kind;
164-
use crate::test_utils::{init_local_runtime_and_context, init_stealing_runtime_and_context};
143+
use crate::context::{self, PendingIoOp};
165144
use anyhow::Result;
166-
use rstest::rstest;
167145

168146
#[ringolo::test]
169147
async fn test_pending_io_on_core_and_shared() -> Result<()> {

ringolo/src/context/maintenance/cleanup.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,6 @@ impl CleanupHandler {
238238
}
239239
}
240240

241-
#[doc(hidden)]
242241
fn cvt(ops: &[CleanupOp]) -> Vec<AnyOp<'_>> {
243242
ops.iter().map(|op| op.into()).collect()
244243
}

ringolo/src/context/mod.rs

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,30 @@ where
144144
})
145145
}
146146

147+
#[inline(always)]
148+
pub(crate) fn with_context<F, R>(f: F) -> R
149+
where
150+
F: FnOnce(&Context) -> R,
151+
{
152+
CONTEXT.with(|ctx| {
153+
let root = ctx.get().expect("Context not initialized").borrow();
154+
f(&root.context)
155+
})
156+
}
157+
158+
#[inline(always)]
159+
pub(crate) fn try_with_context<F, R>(f: F) -> Option<R>
160+
where
161+
F: FnOnce(&Context) -> R,
162+
{
163+
CONTEXT
164+
.try_with(|ctx| {
165+
let root = ctx.get().expect("Context not initialized").borrow();
166+
f(&root.context)
167+
})
168+
.ok()
169+
}
170+
147171
#[inline(always)]
148172
pub(crate) fn with_core<F, R>(f: F) -> R
149173
where
@@ -155,6 +179,17 @@ where
155179
})
156180
}
157181

182+
#[inline(always)]
183+
pub(crate) fn try_with_core<F, R>(f: F) -> Option<R>
184+
where
185+
F: FnOnce(&Core) -> R,
186+
{
187+
try_with_context(|outer| match outer {
188+
Context::Local(c) => c.with_core(f),
189+
Context::Stealing(c) => c.with_core(f),
190+
})
191+
}
192+
158193
#[inline(always)]
159194
pub(crate) fn with_shared<F, R>(f: F) -> R
160195
where
@@ -233,18 +268,6 @@ pub(crate) fn set_current_task(task: Option<Arc<TaskNode>>) -> Option<Arc<TaskNo
233268
with_core(|core| core.current_task.replace(task))
234269
}
235270

236-
// Private helpers.
237-
#[inline(always)]
238-
pub(crate) fn with_context<F, R>(f: F) -> R
239-
where
240-
F: FnOnce(&Context) -> R,
241-
{
242-
CONTEXT.with(|ctx| {
243-
let root = ctx.get().expect("Context not initialized").borrow();
244-
f(&root.context)
245-
})
246-
}
247-
248271
#[inline(always)]
249272
pub(crate) fn with_scheduler<F, R>(f: F) -> R
250273
where

ringolo/src/context/ring.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -146,38 +146,48 @@ impl SingleIssuerRing {
146146
self.sq().taskrun()
147147
}
148148

149-
// Will busy loop until `num_to_complete` has been achieved. It is the caller's
149+
// Will busy loop until `num_to_visit` CQE have been visited. It is the caller's
150150
// responsibility to make sure the CQ will see that many completions, otherwise
151151
// this will result in an infinite loop.
152152
pub(crate) fn process_cqes(
153153
&mut self,
154154
slab: &mut RawSqeSlab,
155-
num_to_complete: Option<usize>,
155+
num_to_visit: Option<usize>,
156156
) -> Result<usize> {
157+
// We need to split `num_visited`/`num_completed` counters because if we
158+
// cancel an SQE inflight, the entry might be dropped from the slab while
159+
// still producing a CQE. In that case, the loop would only increment the
160+
// `num_visited` counter.
161+
let mut num_visited = 0;
157162
let mut num_completed = 0;
158-
let mut should_sync = false;
159163

160-
let to_complete = num_to_complete.unwrap_or(self.cq().len());
164+
let mut cq = self.cq();
165+
let to_visit = num_to_visit.unwrap_or(cq.len());
161166

162-
while num_completed < to_complete {
163-
let mut cq = self.cq();
167+
let mut should_sync = false;
164168

165-
// Avoid syncing on first pass
169+
while num_visited < to_visit {
170+
// Avoid syncing on first pass.
166171
if should_sync {
172+
// If we get here, we are spinning waiting for completions so
173+
// let's give the CPU a tiny breather.
174+
std::thread::yield_now();
167175
cq.sync();
168176
}
169177

170-
for cqe in cq {
178+
for cqe in &mut cq {
179+
num_visited += 1;
180+
171181
let raw_sqe = match slab.get_mut(cqe.user_data() as usize) {
172182
Err(e) => {
173183
eprintln!("CQE user data not found in RawSqeSlab: {:?}", e);
174184
continue;
175185
}
176186
Ok(sqe) => {
177-
if !matches!(sqe.get_state(), RawSqeState::Pending | RawSqeState::Ready) {
187+
if !matches!(sqe.state, RawSqeState::Pending | RawSqeState::Ready) {
178188
// Ignore unknown CQEs which might have valid index in
179189
// the Slab. Can this even happen?
180-
eprintln!("SQE in unexpected state: {:?}", sqe.get_state());
190+
eprintln!("SQE in unexpected state: {:?}", sqe.state);
181191
continue;
182192
}
183193
sqe
@@ -189,7 +199,12 @@ impl SingleIssuerRing {
189199
if let Some(CompletionEffect::WakeHead { head }) =
190200
raw_sqe.on_completion(cqe.result(), cqe.flags().into())?
191201
{
192-
slab.get_mut(head)?.wake()?;
202+
slab.get_mut(head)?.wake_by_ref()?;
203+
}
204+
205+
// Exit early if user provided an override for `to_visit`.
206+
if num_visited >= to_visit {
207+
break;
193208
}
194209
}
195210

@@ -273,6 +288,8 @@ mod tests {
273288
let builder = Builder::new_local().sq_ring_size(sq_ring_size);
274289
let (_runtime, _scheduler) = init_local_runtime_and_context(Some(builder))?;
275290

291+
let (waker, _) = mock_waker();
292+
276293
context::with_slab_and_ring_mut(|slab, ring| -> Result<()> {
277294
{
278295
let sq = ring.sq();
@@ -289,7 +306,7 @@ mod tests {
289306

290307
(0..n).for_each(|i| {
291308
nops.push(nop().user_data(indices[i] as u64));
292-
raws.push(RawSqe::new(CompletionHandler::new_single()));
309+
raws.push(RawSqe::new(&waker, CompletionHandler::new_single()));
293310
});
294311

295312
let _ = batch.commit(raws)?;
@@ -342,15 +359,9 @@ mod tests {
342359
(0..n).for_each(|i| {
343360
let nop = nop().user_data(indices[i] as u64);
344361

345-
let mut raw = RawSqe::new(CompletionHandler::new_single());
346-
raw.set_waker(&waker);
347-
362+
let raw = RawSqe::new(&waker, CompletionHandler::new_single());
348363
nops.push(nop);
349364
raws.push(raw);
350-
351-
// We use single SQE even though we use batch API on slab,
352-
// anyways a bit messed up but need to count N pending ios.
353-
context::with_core(|core| core.increment_pending_ios());
354365
});
355366

356367
let _ = batch.commit(raws)?;

ringolo/src/context/slab.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::ops::{Deref, DerefMut};
1212
/// This structure manages the lifecycle of raw pointers used in I/O requests.
1313
/// It utilizes a "Reserve-Commit" pattern to ensure that entries are only
1414
/// permanently occupied if the I/O submission logic succeeds.
15+
#[derive(Debug)]
1516
pub(crate) struct RawSqeSlab {
1617
slab: slab::Slab<RawSqe>,
1718
}
@@ -191,14 +192,15 @@ mod tests {
191192
#[test]
192193
fn test_insert_and_len() -> Result<()> {
193194
let (_runtime, _scheduler) = init_local_runtime_and_context(None)?;
195+
let (waker, _) = mock_waker();
194196

195197
let n_sqes = 3;
196198
let mut slab = RawSqeSlab::new(n_sqes);
197199

198200
for i in 1..=n_sqes {
199201
let reserved = slab.reserve_entry()?;
200202
let idx = reserved.key();
201-
reserved.commit(RawSqe::new(CompletionHandler::new_single()));
203+
reserved.commit(RawSqe::new(&waker, CompletionHandler::new_single()));
202204

203205
assert!(slab.get(idx).is_ok());
204206
assert_eq!(slab.len(), i);
@@ -210,13 +212,14 @@ mod tests {
210212
#[test]
211213
fn test_slab_full() -> Result<()> {
212214
let (_runtime, _scheduler) = init_local_runtime_and_context(None)?;
215+
let (waker, _) = mock_waker();
213216

214217
let n_sqes = 3;
215218
let mut slab = RawSqeSlab::new(n_sqes - 1);
216219

217220
for i in 1..=n_sqes {
218221
let res = slab.reserve_entry().map(|reserved| {
219-
reserved.commit(RawSqe::new(CompletionHandler::new_single()));
222+
reserved.commit(RawSqe::new(&waker, CompletionHandler::new_single()));
220223
});
221224

222225
if i == n_sqes {
@@ -232,6 +235,7 @@ mod tests {
232235
#[test]
233236
fn test_reserve_batch() -> Result<()> {
234237
let (_runtime, _scheduler) = init_local_runtime_and_context(None)?;
238+
let (waker, _) = mock_waker();
235239

236240
context::with_slab_mut(|slab| -> Result<()> {
237241
let n_sqes = 16;
@@ -256,7 +260,7 @@ mod tests {
256260

257261
// Commit the batch
258262
let entries = (0..n_sqes)
259-
.map(|_| RawSqe::new(CompletionHandler::new_single()))
263+
.map(|_| RawSqe::new(&waker, CompletionHandler::new_single()))
260264
.collect::<SmallVec<[_; SPILL_TO_HEAP_THRESHOLD]>>();
261265

262266
batch.commit(entries)?;

ringolo/src/runtime/cancel.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,13 @@ mod tests {
174174
let stats = ringolo::recursive_cancel_all_metadata(&TaskMetadata::from(["dead"]))?;
175175
assert_eq!(stats.cancelled, 1);
176176
assert_eq!(stats.visited, n + 1);
177+
drop(cancelled);
177178

178-
// Check orphanage - only happens after we drop the join_handle
179+
// Next poll on the cancelled task creates orphans
179180
let orphan_root = get_orphan_root();
180-
assert_eq!(orphan_root.num_children(), 0);
181-
drop(cancelled);
182-
assert_eq!(orphan_root.num_children(), n);
181+
while orphan_root.num_children() < n {
182+
assert!(YieldNow::new(Some(AddMode::Fifo)).await.is_ok());
183+
}
183184

184185
// Now cancel remaining orphans
185186
let stats = ringolo::recursive_cancel_all_orphans()?;

ringolo/src/runtime/local/worker.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::runtime::{AddMode, EventLoop};
77
use crate::runtime::{Ticker, TickerData, TickerEvents};
88
use crate::task::TaskNodeGuard;
99
use crate::task::ThreadId;
10-
use anyhow::{Result, anyhow};
10+
use anyhow::{anyhow, Result};
1111
use std::cell::RefCell;
1212
use std::collections::VecDeque;
1313
use std::pin::pin;
@@ -56,7 +56,7 @@ impl EventLoop for Worker {
5656
// the end corresponds to the front.
5757
match mode {
5858
AddMode::Fifo => q.push_front(task),
59-
AddMode::Lifo => q.push_back(task),
59+
AddMode::Lifo | AddMode::Cancel => q.push_back(task),
6060
}
6161
}
6262

@@ -105,7 +105,9 @@ impl Worker {
105105
})?;
106106
} else {
107107
match root_result.is_some() {
108-
true => return Ok(root_result),
108+
true => {
109+
return Ok(root_result);
110+
}
109111
false => scheduler.set_root_woken(),
110112
}
111113
}

ringolo/src/runtime/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ pub enum AddMode {
153153
/// polled in the order they were yielded, but may be less cache-efficient
154154
/// than `Lifo`.
155155
Fifo,
156+
157+
/// Schedules a task to be cancelled.
158+
///
159+
/// We leverage the `RawTask::remote_abort` API to make sure we cancel the
160+
/// task on the owning thread so we can free resources in the right place.
161+
Cancel,
156162
}
157163

158164
#[doc(hidden)]

ringolo/src/runtime/registry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ impl<S: Schedule> OwnedTasks<S> {
229229
/// Expose functionality of the TaskRegistry common to any scheduler.
230230
pub(crate) trait TaskRegistry: Send + Sync + std::fmt::Debug {
231231
/// Allow shutting down a specific task.
232-
fn shutdown(&self, id: &Id);
232+
fn remote_abort(&self, id: &Id);
233233

234234
/// Returns true if the registry was closed as part of runtime shutdown.
235235
fn is_closed(&self) -> bool;
@@ -239,9 +239,9 @@ pub(crate) trait TaskRegistry: Send + Sync + std::fmt::Debug {
239239
}
240240

241241
impl<S: Schedule> TaskRegistry for OwnedTasks<S> {
242-
fn shutdown(&self, id: &Id) {
243-
if let Some((_, task)) = self.tasks.remove(id) {
244-
task.shutdown()
242+
fn remote_abort(&self, id: &Id) {
243+
if let Some(task) = self.tasks.get(id) {
244+
task.remote_abort();
245245
}
246246
}
247247

ringolo/src/runtime/stealing/root_worker.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ impl RootWorker {
5656
impl EventLoop for RootWorker {
5757
type Task = StealableTask;
5858

59-
fn add_task(&self, task: Self::Task, _mode: AddMode) {
59+
fn add_task(&self, task: Self::Task, mode: AddMode) {
6060
debug_assert!(
61-
task.is_maintenance_task(),
62-
"Only maintenance task can be scheduled on root_worker."
61+
task.is_maintenance_task() || matches!(mode, AddMode::Cancel),
62+
"Can only schedule maintenance task or unclaimed tasks
63+
that needs cancellation on root_worker"
6364
);
6465
task.set_owner_id(self.thread_id);
6566
self.pollable.borrow_mut().push_back(task);
@@ -101,10 +102,6 @@ impl RootWorker {
101102

102103
loop {
103104
if let Some(task) = self.find_task() {
104-
debug_assert!(
105-
task.is_maintenance_task(),
106-
"Only maintenance task should be running on root_worker"
107-
);
108105
task.run();
109106
} else if ctx.with_core(|core| core.get_pending_ios() > 0) {
110107
// Block thread waiting for next completion.

0 commit comments

Comments
 (0)