Skip to content

Commit

Permalink
rt: move coop mod into runtime (#5152)
Browse files Browse the repository at this point in the history
This is a step towards unifying thread-local variables. In the future,
`coop` will be updated to use the runtime context thread-local to store
its state.
  • Loading branch information
carllerche committed Nov 1, 2022
1 parent 203a079 commit a051ed7
Show file tree
Hide file tree
Showing 21 changed files with 26 additions and 34 deletions.
2 changes: 1 addition & 1 deletion tokio/src/io/util/empty.rs
Expand Up @@ -77,7 +77,7 @@ impl fmt::Debug for Empty {

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/io/util/mem.rs
Expand Up @@ -233,7 +233,7 @@ impl AsyncRead for Pipe {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_read_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -261,7 +261,7 @@ impl AsyncWrite for Pipe {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_write_internal(cx, buf);
if ret.is_ready() {
Expand Down
9 changes: 0 additions & 9 deletions tokio/src/lib.rs
Expand Up @@ -497,18 +497,9 @@ cfg_rt! {
pub mod runtime;
}
cfg_not_rt! {
#[cfg(any(
feature = "macros",
feature = "net",
feature = "time",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
pub(crate) mod runtime;
}

pub(crate) mod coop;

cfg_signal! {
pub mod signal;
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/park/thread.rs
Expand Up @@ -269,7 +269,7 @@ impl CachedParkThread {
pin!(f);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/mod.rs
Expand Up @@ -954,7 +954,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = Pin::new(&mut self.inner).poll(cx);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/task.rs
Expand Up @@ -37,7 +37,7 @@ where
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::coop::stop();
crate::runtime::coop::stop();

Poll::Ready(func())
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tokio/src/runtime/enter.rs
Expand Up @@ -173,7 +173,7 @@ cfg_rt! {
let when = Instant::now() + timeout;

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/io/registration.rs
Expand Up @@ -145,7 +145,7 @@ impl Registration {
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle().is_shutdown() {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -180,6 +180,8 @@ mod tests;
#[cfg(any(feature = "rt", feature = "macros"))]
pub(crate) mod context;

pub(crate) mod coop;

mod driver;

pub(crate) mod scheduler;
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -290,7 +290,7 @@ impl Context {
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
self.enter(core, || crate::runtime::coop::budget(f))
}

/// Blocks the current thread until an event is received by the driver,
Expand Down Expand Up @@ -533,7 +533,7 @@ impl CoreGuard<'_> {

if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -56,14 +56,13 @@
//! the inject queue indefinitely. This would be a ref-count cycle and a memory
//! leak.

use crate::coop;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, OwnedTasks};
use crate::runtime::{
blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
blocking, coop, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
};
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -295,7 +295,7 @@ impl<T> Future for JoinHandle<T> {
let mut ret = Poll::Pending;

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

// Raw should always be set. If it is not, this is due to polling after
// completion
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Expand Up @@ -540,11 +540,11 @@ impl Future for Acquire<'_> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::coop::poll_proceed(cx),
crate::runtime::coop::poll_proceed(cx),
));

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Pending => {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/chan.rs
Expand Up @@ -243,7 +243,7 @@ impl<T, S: Semaphore> Rx<T, S> {
use super::block::Read::*;

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -785,7 +785,7 @@ impl<T> Sender<T> {
/// ```
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

Expand Down Expand Up @@ -1124,7 +1124,7 @@ impl<T> Inner<T> {

fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

// Load the state
let mut state = State::load(&self.state, Acquire);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/consume_budget.rs
Expand Up @@ -36,7 +36,7 @@ pub async fn consume_budget() {
if status.is_ready() {
return status;
}
status = crate::coop::poll_proceed(cx).map(|restore| {
status = crate::runtime::coop::poll_proceed(cx).map(|restore| {
restore.made_progress();
});
status
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/task/local.rs
Expand Up @@ -607,7 +607,7 @@ impl LocalSet {
// task initially. Because `LocalSet` itself is `!Send`, and
// `spawn_local` spawns into the `LocalSet` on the current
// thread, the invariant is maintained.
Some(task) => crate::coop::budget(|| task.run()),
Some(task) => crate::runtime::coop::budget(|| task.run()),
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.
Expand Down Expand Up @@ -893,7 +893,7 @@ impl<T: Future> Future for RunUntil<'_, T> {
let _no_blocking = crate::runtime::enter::disallow_block_in_place();
let f = me.future;

if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
if let Poll::Ready(output) = crate::runtime::coop::budget(|| f.poll(cx)) {
return Poll::Ready(output);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/unconstrained.rs
Expand Up @@ -22,7 +22,7 @@ where
cfg_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
crate::coop::with_unconstrained(|| inner.poll(cx))
crate::runtime::coop::with_unconstrained(|| inner.poll(cx))
}
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/sleep.rs
Expand Up @@ -392,11 +392,11 @@ impl Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_elapsed",
crate::coop::poll_proceed(cx),
crate::runtime::coop::poll_proceed(cx),
));

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/timeout.rs
Expand Up @@ -5,7 +5,7 @@
//! [`Timeout`]: struct@Timeout

use crate::{
coop,
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
};
Expand Down

0 comments on commit a051ed7

Please sign in to comment.