Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it possible to disable the cross pool dispatch optimization. #765

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions rayon-core/src/lib.rs
Expand Up @@ -143,6 +143,10 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
breadth_first: bool,

/// If true, dispatching a job to a different rayon [`ThreadPool`] will block.
/// If false, the thread will continue processing other jobs from its pool.
disable_cross_pool_optimization: bool,
}

/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
Expand Down Expand Up @@ -179,6 +183,7 @@ impl Default for ThreadPoolBuilder {
exit_handler: None,
spawn_handler: DefaultSpawn,
breadth_first: false,
disable_cross_pool_optimization: false,
}
}
}
Expand Down Expand Up @@ -362,6 +367,7 @@ impl<S> ThreadPoolBuilder<S> {
start_handler: self.start_handler,
exit_handler: self.exit_handler,
breadth_first: self.breadth_first,
disable_cross_pool_optimization: self.disable_cross_pool_optimization,
}
}

Expand Down Expand Up @@ -519,6 +525,19 @@ impl<S> ThreadPoolBuilder<S> {
self.breadth_first
}

/// Disables an optimization that allows a pool thread to continue processing
/// other jobs when it does a "blocking" dispatch to a different [`ThreadPool`].
///
/// Note that this could lead to deadlocks if all pool threads block.
pub fn disable_cross_pool_optimization(mut self) -> Self {
self.disable_cross_pool_optimization = true;
self
}

fn get_disable_cross_pool_optimization(&self) -> bool {
self.disable_cross_pool_optimization
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -685,6 +704,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
ref exit_handler,
spawn_handler: _,
ref breadth_first,
ref disable_cross_pool_optimization,
} = *self;

// Just print `Some(<closure>)` or `None` to the debug
Expand All @@ -708,6 +728,10 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
.field(
"disable_cross_pool_optimization",
disable_cross_pool_optimization,
)
.finish()
}
}
Expand Down
20 changes: 18 additions & 2 deletions rayon-core/src/registry.rs
Expand Up @@ -153,6 +153,8 @@ pub(super) struct Registry {
// These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
// and that job will keep the pool alive.
terminate_latch: CountLatch,

disable_cross_pool_optimization: bool,
}

/// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -241,6 +243,7 @@ impl Registry {
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
exit_handler: builder.take_exit_handler(),
disable_cross_pool_optimization: builder.get_disable_cross_pool_optimization(),
});

// If we return early or panic, make sure to terminate existing threads.
Expand Down Expand Up @@ -311,6 +314,10 @@ impl Registry {
}
}

fn disable_cross_pool_optimization(&self) -> bool {
self.disable_cross_pool_optimization
}

pub(super) fn num_threads(&self) -> usize {
self.thread_infos.len()
}
Expand Down Expand Up @@ -416,7 +423,11 @@ impl Registry {
{
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
if worker_thread.is_null()
|| (*worker_thread)
.registry()
.disable_cross_pool_optimization()
{
self.in_worker_cold(op)
} else if (*worker_thread).registry().id() != self.id() {
self.in_worker_cross(&*worker_thread, op)
Expand All @@ -439,7 +450,12 @@ impl Registry {

LOCK_LATCH.with(|l| {
// This thread isn't a member of *any* thread pool, so just block.
debug_assert!(WorkerThread::current().is_null());
debug_assert!(
WorkerThread::current().is_null()
|| (*WorkerThread::current())
.registry()
.disable_cross_pool_optimization()
);
let job = StackJob::new(
|injected| {
let worker_thread = WorkerThread::current();
Expand Down