diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 7334fbeda64..587c45694a5 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -4,7 +4,7 @@ use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task}; use crate::sync::AtomicWaker; use crate::util::VecDequeCell; -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::collections::VecDeque; use std::fmt; use std::future::Future; @@ -253,7 +253,7 @@ pin_project! { } } -thread_local!(static CURRENT: RefCell>> = RefCell::new(None)); +thread_local!(static CURRENT: Cell>> = Cell::new(None)); cfg_rt! { /// Spawns a `!Send` future on the local task set. @@ -303,7 +303,8 @@ cfg_rt! { F::Output: 'static { CURRENT.with(|maybe_cx| { - match maybe_cx.borrow().as_ref() { + let ctx = clone_rc(maybe_cx); + match ctx { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), Some(cx) => cx.spawn(future, name) } @@ -328,7 +329,6 @@ pub struct LocalEnterGuard(Option>); impl Drop for LocalEnterGuard { fn drop(&mut self) { CURRENT.with(|ctx| { - // *ctx.borrow_mut() = self.0.take(); ctx.replace(self.0.take()); }) } @@ -354,7 +354,7 @@ impl LocalSet { /// Enter current LocalSet context pub fn enter(&self) -> LocalEnterGuard { CURRENT.with(|ctx| { - let old = ctx.borrow_mut().replace(self.context.clone()); + let old = ctx.replace(Some(self.context.clone())); LocalEnterGuard(old) }) } @@ -586,10 +586,9 @@ impl LocalSet { } fn with(&self, f: impl FnOnce() -> T) -> T { - // CURRENT.set(&self.context, f) CURRENT.with(|ctx| { struct Reset<'a> { - ctx_ref: &'a RefCell>>, + ctx_ref: &'a Cell>>, val: Option>, } impl<'a> Drop for Reset<'a> { @@ -597,7 +596,7 @@ impl LocalSet { self.ctx_ref.replace(self.val.take()); } } - let old = ctx.borrow_mut().replace(self.context.clone()); + let old = ctx.replace(Some(self.context.clone())); let _reset = Reset { ctx_ref: ctx, @@ -725,23 +724,33 @@ impl Future for RunUntil<'_, T> { } } +fn clone_rc(rc: &Cell>>) -> Option> { + let value = rc.take(); + let cloned = value.clone(); + rc.set(value); + cloned +} + impl Shared { /// Schedule the provided task on the scheduler. fn schedule(&self, task: task::Notified>) { - CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { - Some(cx) if cx.shared.ptr_eq(self) => { - cx.queue.push_back(task); - } - _ => { - // First check whether the queue is still there (if not, the - // LocalSet is dropped). Then push to it if so, and if not, - // do nothing. - let mut lock = self.queue.lock(); - - if let Some(queue) = lock.as_mut() { - queue.push_back(task); - drop(lock); - self.waker.wake(); + CURRENT.with(|maybe_cx| { + let ctx = clone_rc(maybe_cx); + match ctx { + Some(cx) if cx.shared.ptr_eq(self) => { + cx.queue.push_back(task); + } + _ => { + // First check whether the queue is still there (if not, the + // LocalSet is dropped). Then push to it if so, and if not, + // do nothing. + let mut lock = self.queue.lock(); + + if let Some(queue) = lock.as_mut() { + queue.push_back(task); + drop(lock); + self.waker.wake(); + } } } }); @@ -754,11 +763,14 @@ impl Shared { impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| match maybe_cx.borrow().as_ref() { - None => panic!("scheduler context missing"), - Some(cx) => { - assert!(cx.shared.ptr_eq(self)); - cx.owned.remove(task) + CURRENT.with(|maybe_cx| { + let ctx = clone_rc(maybe_cx); + match ctx { + None => panic!("scheduler context missing"), + Some(cx) => { + assert!(cx.shared.ptr_eq(self)); + cx.owned.remove(task) + } } }) }