Skip to content

Commit

Permalink
Nested spawns on scope (#4466)
Browse files Browse the repository at this point in the history
# Objective

- Add ability to create nested spawns. This is needed for stageless. The current executor spawns tasks for each system early and runs the system by communicating through a channel. In stageless we want to spawn the task late, so that archetypes can be updated right before the task is run. The executor is run on a separate task, so this enables the scope to be passed to the spawned executor.
- Fixes #4301

## Solution

- Instantiate a single threaded executor on the scope and use that instead of the LocalExecutor. This allows the scope to be Send, but still able to spawn tasks onto the main thread the scope is run on. This works because while systems can access nonsend data. The systems themselves are Send. Because of this change we lose the ability to spawn nonsend tasks on the scope, but I don't think this is being used anywhere. Users would still be able to use spawn_local on TaskPools.
- Steals the lifetime tricks the `std::thread::scope` uses to allow nested spawns, but disallow scope to be passed to tasks or threads not associated with the scope.
- Change the storage for the tasks to a `ConcurrentQueue`. This is to allow a &Scope to be passed for spawning instead of a &mut Scope. `ConcurrentQueue` was chosen because it was already in our dependency tree because `async_executor` depends on it.
- removed the optimizations for 0 and 1 spawned tasks. It did improve those cases, but made the cases of more than 1 task slower.
---

## Changelog

Add ability to nest spawns

```rust
fn main() {
    let pool = TaskPool::new();
    pool.scope(|scope| {
        scope.spawn(async move {
            // calling scope.spawn from an spawn task was not possible before
            scope.spawn(async move {
                // do something
            });
        });
    })
}
```

## Migration Guide

If you were using explicit lifetimes and Passing Scope you'll need to specify two lifetimes now.

```rust
fn scoped_function<'scope>(scope: &mut Scope<'scope, ()>) {}
// should become
fn scoped_function<'scope>(scope: &Scope<'_, 'scope, ()>) {}
```

`scope.spawn_local` changed to `scope.spawn_on_scope` this should cover cases where you needed to run tasks on the local thread, but does not cover spawning Nonsend Futures.

## TODO
* [x] think real hard about all the lifetimes
* [x] add doc about what 'env and 'scope mean.
* [x] manually check that the single threaded task pool still works
* [x] Get updated perf numbers
* [x] check and make sure all the transmutes are necessary
* [x] move commented out test into a compile fail test
* [x] look through the tests for scope on std and see if I should add any more tests

Co-authored-by: Michael Hsu <myhsu@benjaminelectric.com>
Co-authored-by: Carter Anderson <mcanders1@gmail.com>
  • Loading branch information
3 people committed Sep 28, 2022
1 parent 92c90a9 commit d22d310
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 91 deletions.
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Expand Up @@ -166,7 +166,7 @@ impl ParallelExecutor {
/// queues systems with no dependencies to run (or skip) at next opportunity.
fn prepare_systems<'scope>(
&mut self,
scope: &mut Scope<'scope, ()>,
scope: &Scope<'_, 'scope, ()>,
systems: &'scope mut [SystemContainer],
world: &'scope World,
) {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
scope.spawn_on_scope(task);
}

#[cfg(test)]
Expand Down Expand Up @@ -271,7 +271,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
scope.spawn_on_scope(task);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_tasks/Cargo.toml
Expand Up @@ -13,6 +13,7 @@ futures-lite = "1.4.0"
async-executor = "1.3.0"
async-channel = "1.4.2"
once_cell = "1.7"
concurrent-queue = "1.2.2"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
Expand Down
48 changes: 30 additions & 18 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
@@ -1,5 +1,6 @@
use std::{
future::Future,
marker::PhantomData,
mem,
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -61,27 +62,34 @@ impl TaskPool {
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
/// This is similar to `rayon::scope` and `crossbeam::scope`
pub fn scope<'scope, F, T>(&self, f: F) -> Vec<T>
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
where
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
let executor = &async_executor::LocalExecutor::new();
let executor: &'scope async_executor::LocalExecutor<'scope> =
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };

let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
results: Vec::new(),
results,
scope: PhantomData,
env: PhantomData,
};

f(&mut scope);
let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };

f(scope_ref);

// Loop until all tasks are done
while executor.try_tick() {}

scope
.results
let results = scope.results.lock().unwrap();
results
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.collect()
Expand Down Expand Up @@ -127,32 +135,36 @@ impl FakeTask {
///
/// For more information, see [`TaskPool::scope`].
#[derive(Debug)]
pub struct Scope<'scope, T> {
executor: &'scope async_executor::LocalExecutor<'scope>,
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'env async_executor::LocalExecutor<'env>,
// Vector to gather results of all futures spawned during scope run
results: Vec<Arc<Mutex<Option<T>>>>,
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}

impl<'scope, T: Send + 'scope> Scope<'scope, T> {
impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
self.spawn_local(f);
pub fn spawn<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
self.spawn_on_scope(f);
}

/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
/// Spawns a scoped future that runs on the thread the scope called from. The
/// scope *must* outlive the provided future. The results of the future will be
/// returned as a part of [`TaskPool::scope`]'s return value.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.push(result.clone());
self.results.lock().unwrap().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
};
Expand Down

0 comments on commit d22d310

Please sign in to comment.