Skip to content

Commit

Permalink
Add the ability to spawn futures
Browse files Browse the repository at this point in the history
Add a way to spawn tasks with a returned `Future`. The task is
immediately queued for the thread pool to execute.
  • Loading branch information
cuviper committed Apr 7, 2023
1 parent 3ffed8e commit 0d883ed
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
138 changes: 138 additions & 0 deletions rayon-core/src/future.rs
@@ -0,0 +1,138 @@
#![allow(missing_docs)]

use crate::job::JobResult;
use crate::unwind;
use crate::ThreadPool;
use crate::{spawn, spawn_fifo};
use crate::{Scope, ScopeFifo};

use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};

struct RayonFuture<T> {
state: Arc<Mutex<State<T>>>,
}

struct RayonFutureJob<T> {
state: Arc<Mutex<State<T>>>,
}

struct State<T> {
result: JobResult<T>,
waker: Option<Waker>,
}

fn new<T>() -> (RayonFuture<T>, RayonFutureJob<T>) {
let state = Arc::new(Mutex::new(State {
result: JobResult::None,
waker: None,
}));
(
RayonFuture {
state: state.clone(),
},
RayonFutureJob { state },
)
}

impl<T> Future for RayonFuture<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.state.lock().expect("rayon future lock");
match mem::replace(&mut guard.result, JobResult::None) {
JobResult::None => {
guard.waker = Some(cx.waker().clone());
Poll::Pending
}
JobResult::Ok(x) => Poll::Ready(x),
JobResult::Panic(p) => {
drop(guard); // don't poison the lock
unwind::resume_unwinding(p);
}
}
}
}

impl<T> RayonFutureJob<T> {
fn execute(self, func: impl FnOnce() -> T) {
let result = unwind::halt_unwinding(func);
let mut guard = self.state.lock().expect("rayon future lock");
guard.result = match result {
Ok(x) => JobResult::Ok(x),
Err(p) => JobResult::Panic(p),
};
if let Some(waker) = guard.waker.take() {
waker.wake();
}
}
}

pub fn spawn_future<F, T>(func: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
spawn(move || job.execute(func));
future
}

pub fn spawn_fifo_future<F, T>(func: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
spawn_fifo(move || job.execute(func));
future
}

impl ThreadPool {
pub fn spawn_future<F, T>(&self, func: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
self.spawn(move || job.execute(func));
future
}

pub fn spawn_fifo_future<F, T>(&self, func: F) -> impl Future<Output = T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (future, job) = new();
self.spawn_fifo(move || job.execute(func));
future
}
}

impl<'scope> Scope<'scope> {
pub fn spawn_future<F, T>(&self, func: F) -> impl Future<Output = T>
where
F: FnOnce(&Self) -> T + Send + 'scope,
T: Send + 'scope,
{
let (future, job) = new();
self.spawn(|scope| job.execute(move || func(scope)));
future
}
}

impl<'scope> ScopeFifo<'scope> {
pub fn spawn_fifo_future<F, T>(&self, func: F) -> impl Future<Output = T>
where
F: FnOnce(&Self) -> T + Send + 'scope,
T: Send + 'scope,
{
let (future, job) = new();
self.spawn_fifo(|scope| job.execute(move || func(scope)));
future
}
}
2 changes: 2 additions & 0 deletions rayon-core/src/lib.rs
Expand Up @@ -80,6 +80,7 @@ mod log;
mod private;

mod broadcast;
mod future;
mod job;
mod join;
mod latch;
Expand All @@ -94,6 +95,7 @@ mod compile_fail;
mod test;

pub use self::broadcast::{broadcast, spawn_broadcast, BroadcastContext};
pub use self::future::{spawn_fifo_future, spawn_future};
pub use self::join::{join, join_context};
pub use self::registry::ThreadBuilder;
pub use self::scope::{in_place_scope, scope, Scope};
Expand Down

0 comments on commit 0d883ed

Please sign in to comment.