Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signal-neon-futures: Poll a future once before queuing it #283

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 21 additions & 10 deletions rust/bridge/node/futures/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@ pub trait EventQueueEx {
impl EventQueueEx for EventQueue {
fn send_future(&self, future: impl Future<Output = ()> + 'static + Send) {
self.send(move |mut cx| {
let task = Arc::new(FutureTask {
queue: cx.queue(),
future: Mutex::new(Some(Box::pin(future))),
});
task.poll();
cx.run_future(future);
Ok(())
})
}
Expand All @@ -43,29 +39,44 @@ impl<T: Future> Future for AssertSendSafe<T> {
}
}

/// Adds support for executing closures and futures on the JavaScript main thread's microtask queue.
/// Adds support for executing closures and futures on the JavaScript main thread's event queue.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my mistake; we're using Node's event queue rather than JavaScript's microtask queue.

pub trait ContextEx<'a>: Context<'a> {
/// Schedules `f` to run on the microtask queue.
/// Schedules `f` to run on the JavaScript thread's event queue.
///
/// Equivalent to `cx.queue().send(f)` except that `f` doesn't need to be `Send`.
fn run_on_queue(&mut self, f: impl FnOnce(TaskContext<'_>) -> NeonResult<()> + 'static) {
fn queue_task(&mut self, f: impl FnOnce(TaskContext<'_>) -> NeonResult<()> + 'static) {
// Because we're currently in a JavaScript context,
// and `f` will run on the event queue associated with the current context,
// we can assert that it's safe to Send `f` to the queue.
let f = AssertSendSafe(f);
self.queue().send(move |cx| f.0(cx));
}

/// Schedules `f` to run on the microtask queue.
/// Schedules `f` to run on the JavaScript thread's event queue.
///
/// Equivalent to `cx.queue().send_future(f)` except that `f` doesn't need to be `Send`.
fn run_future_on_queue(&mut self, f: impl Future<Output = ()> + 'static) {
fn queue_future(&mut self, f: impl Future<Output = ()> + 'static) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No one calls queue_task and queue_future, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right; they're there for "completeness" but I could just drop them.

// Because we're currently in a JavaScript context,
// and `f` will run on the event queue associated with the current context,
// we can assert that it's safe to Send `f` to the queue.
let f = AssertSendSafe(f);
self.queue().send_future(f);
}

/// Runs `f` on the JavaScript thread's event queue.
///
/// Polls the future once synchronously, then schedules it to resume on the event queue.
fn run_future(&mut self, f: impl Future<Output = ()> + 'static) {
// Because we're currently in a JavaScript context,
// and `f` will run on the event queue associated with the current context,
// we can assert that it's safe to Send `f` to the queue.
let f = AssertSendSafe(f);
let task = Arc::new(FutureTask {
queue: self.queue(),
future: Mutex::new(Some(Box::pin(f))),
});
task.poll();
}
}

impl<'a, T: Context<'a>> ContextEx<'a> for T {}
Expand Down
2 changes: 1 addition & 1 deletion rust/bridge/node/futures/src/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
let callbacks_object_root = callbacks_object.root(cx);
let queue = cx.queue();

cx.run_future_on_queue(async move {
cx.run_future(async move {
let result: std::thread::Result<Result<F, PersistentException>> =
future.catch_unwind().await;

Expand Down
2 changes: 1 addition & 1 deletion rust/bridge/node/futures/tests-node-module/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn increment_async(mut cx: FunctionContext) -> JsResult<JsUndefined> {
Err(err) => Err(err.to_string(cx).unwrap().value(cx)),
})?;

cx.run_future_on_queue(async move {
cx.run_future(async move {
let value_or_error = future.await;
queue.send(move |mut cx| {
let new_value = match value_or_error {
Expand Down