Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: move coop mod into runtime #5152

Merged
merged 1 commit into from Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/io/util/empty.rs
Expand Up @@ -77,7 +77,7 @@ impl fmt::Debug for Empty {

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/io/util/mem.rs
Expand Up @@ -233,7 +233,7 @@ impl AsyncRead for Pipe {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_read_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -261,7 +261,7 @@ impl AsyncWrite for Pipe {
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = self.poll_write_internal(cx, buf);
if ret.is_ready() {
Expand Down
9 changes: 0 additions & 9 deletions tokio/src/lib.rs
Expand Up @@ -497,18 +497,9 @@ cfg_rt! {
pub mod runtime;
}
cfg_not_rt! {
#[cfg(any(
feature = "macros",
feature = "net",
feature = "time",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
pub(crate) mod runtime;
}

pub(crate) mod coop;

cfg_signal! {
pub mod signal;
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/park/thread.rs
Expand Up @@ -269,7 +269,7 @@ impl CachedParkThread {
pin!(f);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/mod.rs
Expand Up @@ -954,7 +954,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let ret = Pin::new(&mut self.inner).poll(cx);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/task.rs
Expand Up @@ -37,7 +37,7 @@ where
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::coop::stop();
crate::runtime::coop::stop();

Poll::Ready(func())
}
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tokio/src/runtime/enter.rs
Expand Up @@ -173,7 +173,7 @@ cfg_rt! {
let when = Instant::now() + timeout;

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/io/registration.rs
Expand Up @@ -145,7 +145,7 @@ impl Registration {
direction: Direction,
) -> Poll<io::Result<ReadyEvent>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle().is_shutdown() {
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -180,6 +180,8 @@ mod tests;
#[cfg(any(feature = "rt", feature = "macros"))]
pub(crate) mod context;

pub(crate) mod coop;

mod driver;

pub(crate) mod scheduler;
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -290,7 +290,7 @@ impl Context {
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
self.enter(core, || crate::runtime::coop::budget(f))
}

/// Blocks the current thread until an event is received by the driver,
Expand Down Expand Up @@ -533,7 +533,7 @@ impl CoreGuard<'_> {

if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -56,14 +56,13 @@
//! the inject queue indefinitely. This would be a ref-count cycle and a memory
//! leak.

use crate::coop;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, OwnedTasks};
use crate::runtime::{
blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
blocking, coop, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
};
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -295,7 +295,7 @@ impl<T> Future for JoinHandle<T> {
let mut ret = Poll::Pending;

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

// Raw should always be set. If it is not, this is due to polling after
// completion
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Expand Up @@ -540,11 +540,11 @@ impl Future for Acquire<'_> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::coop::poll_proceed(cx),
crate::runtime::coop::poll_proceed(cx),
));

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Pending => {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/chan.rs
Expand Up @@ -243,7 +243,7 @@ impl<T, S: Semaphore> Rx<T, S> {
use super::block::Read::*;

// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -785,7 +785,7 @@ impl<T> Sender<T> {
/// ```
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

Expand Down Expand Up @@ -1124,7 +1124,7 @@ impl<T> Inner<T> {

fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

// Load the state
let mut state = State::load(&self.state, Acquire);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/consume_budget.rs
Expand Up @@ -36,7 +36,7 @@ pub async fn consume_budget() {
if status.is_ready() {
return status;
}
status = crate::coop::poll_proceed(cx).map(|restore| {
status = crate::runtime::coop::poll_proceed(cx).map(|restore| {
restore.made_progress();
});
status
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/task/local.rs
Expand Up @@ -607,7 +607,7 @@ impl LocalSet {
// task initially. Because `LocalSet` itself is `!Send`, and
// `spawn_local` spawns into the `LocalSet` on the current
// thread, the invariant is maintained.
Some(task) => crate::coop::budget(|| task.run()),
Some(task) => crate::runtime::coop::budget(|| task.run()),
// We have fully drained the queue of notified tasks, so the
// local future doesn't need to be notified again — it can wait
// until something else wakes a task in the local set.
Expand Down Expand Up @@ -893,7 +893,7 @@ impl<T: Future> Future for RunUntil<'_, T> {
let _no_blocking = crate::runtime::enter::disallow_block_in_place();
let f = me.future;

if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
if let Poll::Ready(output) = crate::runtime::coop::budget(|| f.poll(cx)) {
return Poll::Ready(output);
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/task/unconstrained.rs
Expand Up @@ -22,7 +22,7 @@ where
cfg_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
crate::coop::with_unconstrained(|| inner.poll(cx))
crate::runtime::coop::with_unconstrained(|| inner.poll(cx))
}
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/sleep.rs
Expand Up @@ -392,11 +392,11 @@ impl Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_elapsed",
crate::coop::poll_proceed(cx),
crate::runtime::coop::poll_proceed(cx),
));

#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
let coop = ready!(crate::coop::poll_proceed(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));

let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/timeout.rs
Expand Up @@ -5,7 +5,7 @@
//! [`Timeout`]: struct@Timeout

use crate::{
coop,
runtime::coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
};
Expand Down