Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,12 @@ impl Spawner {
let fut =
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());

let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
let (task, handle) = task::unowned(
fut,
BlockingSchedule::new(rt),
id,
std::panic::Location::caller(),
);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
Expand Down
20 changes: 15 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::{poll_fn, Future};
use std::panic::Location;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
Expand Down Expand Up @@ -445,6 +446,7 @@ impl Context {

impl Handle {
/// Spawns a future onto the `CurrentThread` scheduler
#[track_caller]
pub(crate) fn spawn<F>(
me: &Arc<Self>,
future: F,
Expand All @@ -454,10 +456,12 @@ impl Handle {
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
let spawned_at = Location::caller();
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
spawned_at,
_phantom: Default::default(),
});

Expand All @@ -474,6 +478,7 @@ impl Handle {
/// This should only be used when this is a `LocalRuntime` or in another case where the runtime
/// provably cannot be driven from or moved to different threads from the one on which the task
/// is spawned.
#[track_caller]
pub(crate) unsafe fn spawn_local<F>(
me: &Arc<Self>,
future: F,
Expand All @@ -483,10 +488,15 @@ impl Handle {
F: crate::future::Future + 'static,
F::Output: 'static,
{
let (handle, notified) = me.shared.owned.bind_local(future, me.clone(), id);
let spawned_at = Location::caller();
let (handle, notified) = me
.shared
.owned
.bind_local(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
spawned_at,
_phantom: Default::default(),
});

Expand Down Expand Up @@ -771,16 +781,16 @@ impl CoreGuard<'_> {
let task = context.handle.shared.owned.assert_owner(task);

#[cfg(tokio_unstable)]
let task_id = task.task_id();
let task_meta = task.task_meta();

let (c, ()) = context.run_task(core, || {
#[cfg(tokio_unstable)]
context.handle.task_hooks.poll_start_callback(task_id);
context.handle.task_hooks.poll_start_callback(&task_meta);

task.run();

#[cfg(tokio_unstable)]
context.handle.task_hooks.poll_stop_callback(task_id);
context.handle.task_hooks.poll_stop_callback(&task_meta);
});

core = c;
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ cfg_rt! {
}
}

#[track_caller]
pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -136,6 +137,7 @@ cfg_rt! {
/// This should only be called in `LocalRuntime` if the runtime has been verified to be owned
/// by the current thread.
#[allow(irrefutable_let_patterns)]
#[track_caller]
pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
where
F: Future + 'static,
Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::runtime::{
use crate::util::RngSeedGenerator;

use std::fmt;
use std::panic::Location;

mod metrics;

Expand Down Expand Up @@ -37,6 +38,7 @@ pub(crate) struct Handle {

impl Handle {
/// Spawns a future onto the thread pool
#[track_caller]
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
Expand All @@ -49,15 +51,18 @@ impl Handle {
self.close();
}

#[track_caller]
pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
let spawned_at = Location::caller();
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
id,
spawned_at,
_phantom: Default::default(),
});

Expand Down
18 changes: 12 additions & 6 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ impl Context {

fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
#[cfg(tokio_unstable)]
let task_id = task.task_id();
let task_meta = task.task_meta();

let task = self.worker.handle.shared.owned.assert_owner(task);

Expand All @@ -592,12 +592,15 @@ impl Context {
// Unlike the poll time above, poll start callback is attached to the task id,
// so it is tightly associated with the actual poll invocation.
#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_start_callback(task_id);
self.worker
.handle
.task_hooks
.poll_start_callback(&task_meta);

task.run();

#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_stop_callback(task_id);
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);

let mut lifo_polls = 0;

Expand Down Expand Up @@ -663,15 +666,18 @@ impl Context {
let task = self.worker.handle.shared.owned.assert_owner(task);

#[cfg(tokio_unstable)]
let task_id = task.task_id();
let task_meta = task.task_meta();

#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_start_callback(task_id);
self.worker
.handle
.task_hooks
.poll_start_callback(&task_meta);

task.run();

#[cfg(tokio_unstable)]
self.worker.handle.task_hooks.poll_stop_callback(task_id);
self.worker.handle.task_hooks.poll_stop_callback(&task_meta);
}
})
}
Expand Down
59 changes: 57 additions & 2 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::runtime::task::{Id, Schedule, TaskHarnessScheduleHooks};
use crate::util::linked_list;

use std::num::NonZeroU64;
use std::panic::Location;
use std::pin::Pin;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -141,6 +142,12 @@ pub(super) struct Core<T: Future, S> {
/// The task's ID, used for populating `JoinError`s.
pub(super) task_id: Id,

/// The source code location where the task was spawned./
///
/// This is used for populating the `TaskMeta` passed to the task runtime
/// hooks.
pub(super) spawned_at: &'static Location<'static>,

/// Either the future or the output.
pub(super) stage: CoreStage<T>,
}
Expand Down Expand Up @@ -208,7 +215,13 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
pub(super) fn new(
future: T,
scheduler: S,
state: State,
task_id: Id,
spawned_at: &'static Location<'static>,
) -> Box<Cell<T, S>> {
// Separated into a non-generic function to reduce LLVM codegen
fn new_header(
state: State,
Expand Down Expand Up @@ -242,13 +255,20 @@ impl<T: Future, S: Schedule> Cell<T, S> {
stage: UnsafeCell::new(Stage::Running(future)),
},
task_id,
spawned_at,
},
});

#[cfg(debug_assertions)]
{
// Using a separate function for this code avoids instantiating it separately for every `T`.
unsafe fn check<S>(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) {
unsafe fn check<S>(
header: &Header,
trailer: &Trailer,
scheduler: &S,
task_id: &Id,
spawn_location: &&'static Location<'static>,
) {
let trailer_addr = trailer as *const Trailer as usize;
let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) };
assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
Expand All @@ -260,13 +280,20 @@ impl<T: Future, S: Schedule> Cell<T, S> {
let id_addr = task_id as *const Id as usize;
let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) };
assert_eq!(id_addr, id_ptr.as_ptr() as usize);

let spawn_location_addr =
spawn_location as *const &'static Location<'static> as usize;
let spawn_location_ptr =
unsafe { Header::get_spawn_location_ptr(NonNull::from(header)) };
assert_eq!(spawn_location_addr, spawn_location_ptr.as_ptr() as usize);
}
unsafe {
check(
&result.header,
&result.trailer,
&result.core.scheduler,
&result.core.task_id,
&result.core.spawned_at,
);
}
}
Expand Down Expand Up @@ -450,6 +477,34 @@ impl Header {
*ptr
}

/// Gets a pointer to the source code location where the task containing
/// this `Header` was spawned.
///
/// # Safety
///
/// The provided raw pointer must point at the header of a task.
pub(super) unsafe fn get_spawn_location_ptr(
me: NonNull<Header>,
) -> NonNull<&'static Location<'static>> {
let offset = me.as_ref().vtable.spawn_location_offset;
let spawned_at = me
.as_ptr()
.cast::<u8>()
.add(offset)
.cast::<&'static Location<'static>>();
NonNull::new_unchecked(spawned_at)
}

/// Gets the id of the task containing this `Header`.
///
/// # Safety
///
/// The provided raw pointer must point at the header of a task.
pub(super) unsafe fn get_spawn_location(me: NonNull<Header>) -> &'static Location<'static> {
let ptr = Header::get_spawn_location_ptr(me).as_ptr();
*ptr
}

/// Gets the tracing id of the task containing this `Header`.
///
/// # Safety
Expand Down
1 change: 1 addition & 0 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ where
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
f(&TaskMeta {
id: self.core().task_id,
spawned_at: self.core().spawned_at,
_phantom: Default::default(),
})
}));
Expand Down
10 changes: 7 additions & 3 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::util::sharded_list;
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use std::marker::PhantomData;
use std::num::NonZeroU64;
use std::panic::Location;

// The id from the module below is used to verify whether a given task is stored
// in this OwnedTasks, or some other task. The counter starts at one so we can
Expand Down Expand Up @@ -91,13 +92,14 @@ impl<S: 'static> OwnedTasks<S> {
task: T,
scheduler: S,
id: super::Id,
spawned_at: &'static Location<'static>,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (task, notified, join) = super::new_task(task, scheduler, id);
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);
let notified = unsafe { self.bind_inner(task, notified) };
(join, notified)
}
Expand All @@ -111,13 +113,14 @@ impl<S: 'static> OwnedTasks<S> {
task: T,
scheduler: S,
id: super::Id,
spawned_at: &'static Location<'static>,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let (task, notified, join) = super::new_task(task, scheduler, id);
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);
let notified = unsafe { self.bind_inner(task, notified) };
(join, notified)
}
Expand Down Expand Up @@ -258,13 +261,14 @@ impl<S: 'static> LocalOwnedTasks<S> {
task: T,
scheduler: S,
id: super::Id,
spawned_at: &'static Location<'static>,
) -> (JoinHandle<T::Output>, Option<Notified<S>>)
where
S: Schedule,
T: Future + 'static,
T::Output: 'static,
{
let (task, notified, join) = super::new_task(task, scheduler, id);
let (task, notified, join) = super::new_task(task, scheduler, id, spawned_at);

unsafe {
// safety: We just created the task, so we have exclusive access
Expand Down
Loading