Skip to content

Commit

Permalink
Reuse ThreadsafeFunction in EventQueue
Browse files Browse the repository at this point in the history
Node.js optimizes subsequent ThreadsafeFunction invocations to happen
during the same event loop tick, but only if the same instance of
ThreadsafeFunction is used. The performance improvement is most
noticeable when used in Electron, because scheduling a new UV tick in
Electron is very costly.

With this change EventQueue will use an
existing instance of ThreadsafeTrampoline (wrapper around
ThreadsafeFunction) if compiled with napi-6 feature, or it will fallback
to creating a new ThreadsafeFunction per EventQueue instance.

Fix: neon-bindings#727
  • Loading branch information
indutny committed May 19, 2021
1 parent f3a96aa commit 76853a9
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 33 deletions.
80 changes: 47 additions & 33 deletions src/event/event_queue.rs
@@ -1,10 +1,11 @@
use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;
use std::sync::{Arc, RwLock};

use crate::context::internal::ContextInternal;
use crate::context::{Context, TaskContext};
#[cfg(feature = "napi-6")]
use crate::lifecycle::InstanceData;
use crate::result::NeonResult;

type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
use crate::trampoline::ThreadsafeTrampoline;

/// Queue for scheduling Rust closures to execute on the JavaScript main thread.
///
Expand Down Expand Up @@ -50,7 +51,7 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
/// ```

pub struct EventQueue {
tsfn: ThreadsafeFunction<Callback>,
trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
has_ref: bool,
}

Expand All @@ -64,30 +65,48 @@ impl EventQueue {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
#[cfg(feature = "napi-6")]
let trampoline = InstanceData::threadsafe_trampoline(cx);

Self {
tsfn,
has_ref: true,
}
#[cfg(not(feature = "napi-6"))]
let trampoline = ThreadsafeTrampoline::new(env);

let mut queue = Self {
trampoline: trampoline,
has_ref: false,
};

// Start referenced
queue.reference(cx);

queue
}

/// Allow the Node event loop to exit while this `EventQueue` exists.
/// _Idempotent_
pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = false;
if !self.has_ref {
return self;
}

unsafe { self.tsfn.unref(cx.env().to_raw()) }
self.has_ref = false;
self.trampoline.write().unwrap().unref(cx.env().to_raw());

self
}

/// Prevent the Node event loop from exiting while this `EventQueue` exists. (Default)
/// _Idempotent_
pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = true;
if self.has_ref {
return self;
}

unsafe { self.tsfn.reference(cx.env().to_raw()) }
self.has_ref = true;
self.trampoline
.write()
.unwrap()
.reference(cx.env().to_raw());

self
}
Expand All @@ -107,34 +126,29 @@ impl EventQueue {
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});

self.tsfn.call(callback, None).map_err(|_| EventQueueError)
let trampoline = self.trampoline.read().unwrap();
trampoline.try_send(f).map_err(|_| EventQueueError)
}

/// Returns a boolean indicating if this `EventQueue` will prevent the Node event
/// queue from exiting.
pub fn has_ref(&self) -> bool {
self.has_ref
}
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
impl Drop for EventQueue {
fn drop(&mut self) {
if !self.has_ref {
return;
}

let trampoline = self.trampoline.clone();

self.send(move |cx| {
trampoline.write().unwrap().unref(cx.env().to_raw());
Ok(())
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Expand Up @@ -106,6 +106,9 @@ pub use neon_macros::*;
#[cfg(feature = "napi-6")]
mod lifecycle;

#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
mod trampoline;

#[cfg(all(feature = "legacy-runtime", feature = "napi-1"))]
compile_error!("Cannot enable both `legacy-runtime` and `napi-*` features.\n\nTo use `napi-*`, disable `legacy-runtime` by setting `default-features` to `false` in Cargo.toml\nor with cargo's --no-default-features flag.");

Expand Down
26 changes: 26 additions & 0 deletions src/lifecycle.rs
Expand Up @@ -10,13 +10,17 @@

use std::mem;
use std::sync::Arc;
#[cfg(feature = "event-queue-api")]
use std::sync::RwLock;

use neon_runtime::raw::Env;
use neon_runtime::reference;
use neon_runtime::tsfn::ThreadsafeFunction;

use crate::context::Context;
use crate::handle::root::NapiRef;
#[cfg(feature = "event-queue-api")]
use crate::trampoline::ThreadsafeTrampoline;

/// `InstanceData` holds Neon data associated with a particular instance of a
/// native module. If a module is loaded multiple times (e.g., worker threads), this
Expand All @@ -30,6 +34,10 @@ pub(crate) struct InstanceData {
/// given the cost of FFI, this optimization is omitted until the cost of an
/// `Arc` is demonstrated as significant.
drop_queue: Arc<ThreadsafeFunction<NapiRef>>,

/// Used in EventQueue to invoke Rust callbacks with Napi environment.
#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
}

fn drop_napi_ref(env: Option<Env>, data: NapiRef) {
Expand Down Expand Up @@ -62,8 +70,18 @@ impl InstanceData {
queue
};

#[cfg(feature = "event-queue-api")]
let threadsafe_trampoline = {
let mut trampoline = ThreadsafeTrampoline::new(env);
trampoline.unref(env);
trampoline
};

let data = InstanceData {
drop_queue: Arc::new(drop_queue),

#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc::new(RwLock::new(threadsafe_trampoline)),
};

unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) }
Expand All @@ -73,4 +91,12 @@ impl InstanceData {
pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc<ThreadsafeFunction<NapiRef>> {
Arc::clone(&InstanceData::get(cx).drop_queue)
}

/// Helper to return a reference to the `invoke_callback` field of `InstanceData`
#[cfg(feature = "event-queue-api")]
pub(crate) fn threadsafe_trampoline<'a, C: Context<'a>>(
cx: &mut C,
) -> Arc<RwLock<ThreadsafeTrampoline>> {
Arc::clone(&InstanceData::get(cx).threadsafe_trampoline)
}
}
68 changes: 68 additions & 0 deletions src/trampoline.rs
@@ -0,0 +1,68 @@
use neon_runtime::raw::Env;
use neon_runtime::tsfn::{CallError, ThreadsafeFunction};

use crate::context::TaskContext;
use crate::result::NeonResult;

pub(crate) type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

pub(crate) struct ThreadsafeTrampoline {
tsfn: ThreadsafeFunction<Callback>,
}

impl ThreadsafeTrampoline {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub(crate) fn new(env: Env) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(env, Self::callback) };

Self { tsfn: tsfn }
}

/// Schedules a closure to execute on the JavaScript thread that created
/// this ThreadsafeTrampoline.
/// Returns an `Error` if the task could not be scheduled.
pub(crate) fn try_send<F>(&self, f: F) -> Result<(), CallError<Callback>>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});

self.tsfn.call(callback, None)
}

/// References a trampoline to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
pub(crate) fn reference(&mut self, env: Env) {
unsafe {
self.tsfn.reference(env);
}
}

/// Unreferences a trampoline to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
pub(crate) fn unref(&mut self, env: Env) {
unsafe {
self.tsfn.unref(env);
}
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
}
}

0 comments on commit 76853a9

Please sign in to comment.