diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index d703b762f..681a19922 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,10 +1,11 @@ -use neon_runtime::raw::Env; -use neon_runtime::tsfn::ThreadsafeFunction; +use std::sync::{Arc, RwLock}; +use crate::context::internal::ContextInternal; use crate::context::{Context, TaskContext}; +#[cfg(feature = "napi-6")] +use crate::lifecycle::InstanceData; use crate::result::NeonResult; - -type Callback = Box; +use crate::trampoline::ThreadsafeTrampoline; /// Queue for scheduling Rust closures to execute on the JavaScript main thread. /// @@ -50,7 +51,7 @@ type Callback = Box; /// ``` pub struct EventQueue { - tsfn: ThreadsafeFunction, + trampoline: Arc>, has_ref: bool, } @@ -64,20 +65,32 @@ impl EventQueue { /// Creates an unbounded queue for scheduling closures on the JavaScript /// main thread pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { - let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; + #[cfg(feature = "napi-6")] + let trampoline = InstanceData::threadsafe_trampoline(cx); - Self { - tsfn, - has_ref: true, - } + #[cfg(not(feature = "napi-6"))] + let trampoline = ThreadsafeTrampoline::new(env); + + let mut queue = Self { + trampoline: trampoline, + has_ref: false, + }; + + // Start referenced + queue.reference(cx); + + queue } /// Allow the Node event loop to exit while this `EventQueue` exists. /// _Idempotent_ pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = false; + if !self.has_ref { + return self; + } - unsafe { self.tsfn.unref(cx.env().to_raw()) } + self.has_ref = false; + self.trampoline.write().unwrap().unref(cx.env().to_raw()); self } @@ -85,9 +98,15 @@ impl EventQueue { /// Prevent the Node event loop from exiting while this `EventQueue` exists. (Default) /// _Idempotent_ pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = true; + if self.has_ref { + return self; + } - unsafe { self.tsfn.reference(cx.env().to_raw()) } + self.has_ref = true; + self.trampoline + .write() + .unwrap() + .reference(cx.env().to_raw()); self } @@ -107,17 +126,8 @@ impl EventQueue { where F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static, { - let callback = Box::new(move |env| { - let env = unsafe { std::mem::transmute(env) }; - - // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because - // N-API creates a `HandleScope` before calling the callback. - TaskContext::with_context(env, move |cx| { - let _ = f(cx); - }); - }); - - self.tsfn.call(callback, None).map_err(|_| EventQueueError) + let trampoline = self.trampoline.read().unwrap(); + trampoline.try_send(f).map_err(|_| EventQueueError) } /// Returns a boolean indicating if this `EventQueue` will prevent the Node event @@ -125,16 +135,20 @@ impl EventQueue { pub fn has_ref(&self) -> bool { self.has_ref } +} - // Monomorphized trampoline funciton for calling the user provided closure - fn callback(env: Option, callback: Callback) { - if let Some(env) = env { - callback(env); - } else { - crate::context::internal::IS_RUNNING.with(|v| { - *v.borrow_mut() = false; - }); +impl Drop for EventQueue { + fn drop(&mut self) { + if !self.has_ref { + return; } + + let trampoline = self.trampoline.clone(); + + self.send(move |cx| { + trampoline.write().unwrap().unref(cx.env().to_raw()); + Ok(()) + }); } } diff --git a/src/lib.rs b/src/lib.rs index 5a36a50e5..4857f360a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,6 +106,9 @@ pub use neon_macros::*; #[cfg(feature = "napi-6")] mod lifecycle; +#[cfg(all(feature = "napi-4", feature = "event-queue-api"))] +mod trampoline; + #[cfg(all(feature = "legacy-runtime", feature = "napi-1"))] compile_error!("Cannot enable both `legacy-runtime` and `napi-*` features.\n\nTo use `napi-*`, disable `legacy-runtime` by setting `default-features` to `false` in Cargo.toml\nor with cargo's --no-default-features flag."); diff --git a/src/lifecycle.rs b/src/lifecycle.rs index 1ccd55705..995e1b0ef 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -10,6 +10,8 @@ use std::mem; use std::sync::Arc; +#[cfg(feature = "event-queue-api")] +use std::sync::RwLock; use neon_runtime::raw::Env; use neon_runtime::reference; @@ -17,6 +19,8 @@ use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::Context; use crate::handle::root::NapiRef; +#[cfg(feature = "event-queue-api")] +use crate::trampoline::ThreadsafeTrampoline; /// `InstanceData` holds Neon data associated with a particular instance of a /// native module. If a module is loaded multiple times (e.g., worker threads), this @@ -30,6 +34,10 @@ pub(crate) struct InstanceData { /// given the cost of FFI, this optimization is omitted until the cost of an /// `Arc` is demonstrated as significant. drop_queue: Arc>, + + /// Used in EventQueue to invoke Rust callbacks with Napi environment. + #[cfg(feature = "event-queue-api")] + threadsafe_trampoline: Arc>, } fn drop_napi_ref(env: Option, data: NapiRef) { @@ -62,8 +70,18 @@ impl InstanceData { queue }; + #[cfg(feature = "event-queue-api")] + let threadsafe_trampoline = { + let mut trampoline = ThreadsafeTrampoline::new(env); + trampoline.unref(env); + trampoline + }; + let data = InstanceData { drop_queue: Arc::new(drop_queue), + + #[cfg(feature = "event-queue-api")] + threadsafe_trampoline: Arc::new(RwLock::new(threadsafe_trampoline)), }; unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) } @@ -73,4 +91,12 @@ impl InstanceData { pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc> { Arc::clone(&InstanceData::get(cx).drop_queue) } + + /// Helper to return a reference to the `invoke_callback` field of `InstanceData` + #[cfg(feature = "event-queue-api")] + pub(crate) fn threadsafe_trampoline<'a, C: Context<'a>>( + cx: &mut C, + ) -> Arc> { + Arc::clone(&InstanceData::get(cx).threadsafe_trampoline) + } } diff --git a/src/trampoline.rs b/src/trampoline.rs new file mode 100644 index 000000000..31bfb56a0 --- /dev/null +++ b/src/trampoline.rs @@ -0,0 +1,68 @@ +use neon_runtime::raw::Env; +use neon_runtime::tsfn::{CallError, ThreadsafeFunction}; + +use crate::context::TaskContext; +use crate::result::NeonResult; + +pub(crate) type Callback = Box; + +pub(crate) struct ThreadsafeTrampoline { + tsfn: ThreadsafeFunction, +} + +impl ThreadsafeTrampoline { + /// Creates an unbounded queue for scheduling closures on the JavaScript + /// main thread + pub(crate) fn new(env: Env) -> Self { + let tsfn = unsafe { ThreadsafeFunction::new(env, Self::callback) }; + + Self { tsfn: tsfn } + } + + /// Schedules a closure to execute on the JavaScript thread that created + /// this ThreadsafeTrampoline. + /// Returns an `Error` if the task could not be scheduled. + pub(crate) fn try_send(&self, f: F) -> Result<(), CallError> + where + F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static, + { + let callback = Box::new(move |env| { + let env = unsafe { std::mem::transmute(env) }; + + // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because + // N-API creates a `HandleScope` before calling the callback. + TaskContext::with_context(env, move |cx| { + let _ = f(cx); + }); + }); + + self.tsfn.call(callback, None) + } + + /// References a trampoline to prevent exiting the event loop until it has been dropped. (Default) + /// Safety: `Env` must be valid for the current thread + pub(crate) fn reference(&mut self, env: Env) { + unsafe { + self.tsfn.reference(env); + } + } + + /// Unreferences a trampoline to allow exiting the event loop before it has been dropped. + /// Safety: `Env` must be valid for the current thread + pub(crate) fn unref(&mut self, env: Env) { + unsafe { + self.tsfn.unref(env); + } + } + + // Monomorphized trampoline funciton for calling the user provided closure + fn callback(env: Option, callback: Callback) { + if let Some(env) = env { + callback(env); + } else { + crate::context::internal::IS_RUNNING.with(|v| { + *v.borrow_mut() = false; + }); + } + } +}