Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a test scheduler that propagates panics #3218

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio/Cargo.toml
Expand Up @@ -38,6 +38,7 @@ full = [
"process",
"rt",
"rt-multi-thread",
"rt-test",
"signal",
"stream",
"sync",
Expand Down Expand Up @@ -73,6 +74,7 @@ rt-multi-thread = [
"num_cpus",
"rt",
]
rt-test = ["rt"]
signal = [
"once_cell",
"libc",
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -299,6 +299,22 @@ macro_rules! cfg_not_rt_multi_thread {
}
}

macro_rules! cfg_rt_test {
($($item:item)*) => {
$(
#[cfg(feature = "rt-test")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-test")))]
$item
)*
}
}

macro_rules! cfg_not_rt_test {
($($item:item)*) => {
$( #[cfg(not(feature = "rt-test"))] $item )*
}
}

macro_rules! cfg_test_util {
($($item:item)*) => {
$(
Expand Down
47 changes: 47 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -77,6 +77,8 @@ pub(crate) enum Kind {
CurrentThread,
#[cfg(feature = "rt-multi-thread")]
MultiThread,
#[cfg(feature = "rt-test")]
TestScheduler,
}

impl Builder {
Expand All @@ -96,6 +98,15 @@ impl Builder {
Builder::new(Kind::MultiThread)
}

/// Returns a new builder with the current thread scheduler selected.
///
/// Configuration methods can be chained on the return value.
#[cfg(feature = "rt-test")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-test")))]
pub fn new_test() -> Builder {
Builder::new(Kind::TestScheduler)
}

/// Returns a new runtime builder initialized with default configuration
/// values.
///
Expand Down Expand Up @@ -374,6 +385,8 @@ impl Builder {
Kind::CurrentThread => self.build_basic_runtime(),
#[cfg(feature = "rt-multi-thread")]
Kind::MultiThread => self.build_threaded_runtime(),
#[cfg(feature = "rt-test")]
Kind::TestScheduler => self.build_test_runtime(),
}
}

Expand Down Expand Up @@ -527,6 +540,40 @@ cfg_rt_multi_thread! {
}
}

cfg_rt_test! {
impl Builder {
fn build_test_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{TestScheduler, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = TestScheduler::new(driver);
let spawner = Spawner::TestScheduler(scheduler.spawner().clone());

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::TestScheduler(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
})
}
}
}

impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -216,6 +216,11 @@ cfg_rt_multi_thread! {
use self::thread_pool::ThreadPool;
}

cfg_rt_test! {
pub(crate) mod test_scheduler;
use self::test_scheduler::TestScheduler;
}

cfg_rt! {
use crate::task::JoinHandle;

Expand Down Expand Up @@ -281,6 +286,10 @@ cfg_rt! {
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-multi-thread")]
ThreadPool(ThreadPool),

/// Execute all tasks on the current-thread. Do not capture panics.
#[cfg(feature = "rt-test")]
TestScheduler(TestScheduler<driver::Driver>),
}

/// After thread starts / before thread stops
Expand Down Expand Up @@ -450,6 +459,8 @@ cfg_rt! {
Kind::CurrentThread(exec) => exec.block_on(future),
#[cfg(feature = "rt-multi-thread")]
Kind::ThreadPool(exec) => exec.block_on(future),
#[cfg(feature = "rt-test")]
Kind::TestScheduler(exec) => exec.block_on(future),
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/spawner.rs
Expand Up @@ -9,12 +9,18 @@ cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
}

cfg_rt_test! {
use crate::runtime::test_scheduler;
}

#[derive(Debug, Clone)]
pub(crate) enum Spawner {
#[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
#[cfg(feature = "rt-test")]
TestScheduler(test_scheduler::Spawner),
}

impl Spawner {
Expand All @@ -40,6 +46,8 @@ cfg_rt! {
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-test")]
Spawner::TestScheduler(spawner) => spawner.spawn(future),
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions tokio/src/runtime/task/core.rs
Expand Up @@ -103,6 +103,29 @@ impl<T: Future, S: Schedule> Cell<T, S> {
},
})
}

cfg_rt_test! {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new_test(future: T, state: State) -> Box<Cell<T, S>> {
Box::new(Cell {
header: Header {
state,
owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
stack_next: UnsafeCell::new(None),
vtable: raw::vtable_test::<T, S>(),
},
core: Core {
scheduler: UnsafeCell::new(None),
stage: UnsafeCell::new(Stage::Running(future)),
},
trailer: Trailer {
waker: UnsafeCell::new(None),
},
})
}
}
}

impl<T: Future, S: Schedule> Core<T, S> {
Expand Down
94 changes: 94 additions & 0 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -137,6 +137,100 @@ where
}
}

cfg_rt_test! {
/// Polls the inner future.
///
/// All necessary state checks and transitions are performed.
///
/// Panics raised while polling the propagated.
pub(super) fn poll_test(self) {
// If this is the first time the task is polled, the task will be bound
// to the scheduler, in which case the task ref count must be
// incremented.
let is_not_bound = !self.core().is_bound();

// Transition the task to the running state.
//
// A failure to transition here indicates the task has been cancelled
// while in the run queue pending execution.
let snapshot = match self.header().state.transition_to_running(is_not_bound) {
Ok(snapshot) => snapshot,
Err(_) => {
// The task was shutdown while in the run queue. At this point,
// we just hold a ref counted reference. Drop it here.
self.drop_reference();
return;
}
};

if is_not_bound {
// Ensure the task is bound to a scheduler instance. Since this is
// the first time polling the task, a scheduler instance is pulled
// from the local context and assigned to the task.
//
// The scheduler maintains ownership of the task and responds to
// `wake` calls.
//
// The task reference count has been incremented.
//
// Safety: Since we have unique access to the task so that we can
// safely call `bind_scheduler`.
self.core().bind_scheduler(self.to_task());
}

// The transition to `Running` done above ensures that a lock on the
// future has been obtained. This also ensures the `*mut T` pointer
// contains the future (as opposed to the output) and is initialized.

let res = {
struct Guard<'a, T: Future, S: Schedule> {
core: &'a Core<T, S>,
}

impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> {
fn drop(&mut self) {
self.core.drop_future_or_output();
}
}

let guard = Guard { core: self.core() };

// If the task is cancelled, avoid polling it, instead signalling it
// is complete.
if snapshot.is_cancelled() {
Poll::Ready(Err(JoinError::cancelled()))
} else {
let res = guard.core.poll(self.header());

// prevent the guard from dropping the future
mem::forget(guard);

res.map(Ok)
}
};

match res {
Poll::Ready(out) => {
self.complete(out, snapshot.is_join_interested());
}
Poll::Pending => {
match self.header().state.transition_to_idle() {
Ok(snapshot) => {
if snapshot.is_notified() {
// Signal yield
self.core().yield_now(Notified(self.to_task()));
// The ref-count was incremented as part of
// `transition_to_idle`.
self.drop_reference();
}
}
Err(_) => self.cancel_task(),
}
}
}
}
}

pub(super) fn dealloc(self) {
// Release the join waker, if there is one.
self.trailer().waker.with_mut(|_| ());
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -119,6 +119,26 @@ cfg_rt! {
}
}

cfg_rt_test! {
/// Create a new task with an associated join handle
pub(crate) fn joinable_test<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
where
T: Future + Send + 'static,
S: Schedule,
{
let raw = RawTask::new_test::<_, S>(task);

let task = Task {
raw,
_p: PhantomData,
};

let join = JoinHandle::new(raw);

(Notified(task), join)
}
}

impl<S: 'static> Task<S> {
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Task {
Expand Down
33 changes: 33 additions & 0 deletions tokio/src/runtime/task/raw.rs
Expand Up @@ -37,6 +37,19 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
}
}

cfg_rt_test! {
/// Get the vtable for the requested `T` and `S` generics.
pub(super) fn vtable_test<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll_test::<T, S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
shutdown: shutdown::<T, S>,
}
}
}

impl RawTask {
pub(super) fn new<T, S>(task: T) -> RawTask
where
Expand All @@ -49,6 +62,19 @@ impl RawTask {
RawTask { ptr }
}

cfg_rt_test! {
pub(super) fn new_test<T, S>(task: T) -> RawTask
where
T: Future,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<_, S>::new_test(task, State::new()));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };

RawTask { ptr }
}
}

pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
RawTask { ptr }
}
Expand Down Expand Up @@ -104,6 +130,13 @@ unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.poll();
}

cfg_rt_test! {
unsafe fn poll_test<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll_test();
}
}

unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.dealloc();
Expand Down