Skip to content

Commit

Permalink
node-api: faster threadsafe_function
Browse files Browse the repository at this point in the history
Invoke threadsafe_function during the same tick and avoid marshalling
costs between threads and/or churning event loop if either:

1. There's a queued call already
2. `Push()` is called while the main thread was running
   threadsafe_function
  • Loading branch information
indutny committed May 2, 2021
1 parent 9f5977a commit 37b10a7
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 31 deletions.
81 changes: 52 additions & 29 deletions src/node_api.cc
Expand Up @@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"

#include <atomic>
#include <memory>

struct node_napi_env__ : public napi_env__ {
Expand Down Expand Up @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
Expand Down Expand Up @@ -176,7 +178,7 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
queue.push(data);
Expand Down Expand Up @@ -211,7 +213,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
}
Expand All @@ -238,7 +240,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond = std::make_unique<node::ConditionVariable>();
}
if (max_queue_size == 0 || cond) {
CHECK_EQ(0, uv_idle_init(loop, &idle));
return napi_ok;
}

Expand All @@ -263,21 +264,43 @@ class ThreadSafeFunction : public node::AsyncResource {

napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

void DispatchOne() {
inline void* Context() {
return context;
}

protected:

void Dispatch() {
bool has_more = true;

// Limit maximum synchronous iteration count to prevent event loop
// starvation. See `src/node_messaging.cc` for an inspiration.
unsigned int iterations_left = kMaxIterationCount;
while (has_more && --iterations_left != 0) {
dispatch_state = kDispatchRunning;
has_more = DispatchOne();

// Send() was called while we were executing the JS function
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
has_more = true;
}
}
}

bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
bool has_more = false;

{
node::Mutex::ScopedLock lock(this->mutex);
Expand All @@ -302,9 +325,9 @@ class ThreadSafeFunction : public node::AsyncResource {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
} else {
CHECK_EQ(0, uv_idle_stop(&idle));
}
} else {
has_more = true;
}
}
}
Expand All @@ -322,6 +345,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}

return has_more;
}

void Finalize() {
Expand All @@ -335,10 +360,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}

inline void* Context() {
return context;
}

void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
Expand All @@ -358,18 +379,20 @@ class ThreadSafeFunction : public node::AsyncResource {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async,
reinterpret_cast<uv_async_t*>(handle));
v8::HandleScope scope(ts_fn->env->isolate);
ts_fn->env->node_env()->CloseHandle(
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
[](uv_handle_t* handle) -> void {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle,
reinterpret_cast<uv_idle_t*>(handle));
ts_fn->Finalize();
});
ts_fn->Finalize();
});
}

int Send() {
// Ask currently running Dispatch() to make one more iteration
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return 0;
}

return uv_async_send(&async);
}

// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
Expand All @@ -393,16 +416,10 @@ class ThreadSafeFunction : public node::AsyncResource {
}
}

static void IdleCb(uv_idle_t* idle) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle, idle);
ts_fn->DispatchOne();
}

static void AsyncCb(uv_async_t* async) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async, async);
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
ts_fn->Dispatch();
}

static void Cleanup(void* data) {
Expand All @@ -411,14 +428,20 @@ class ThreadSafeFunction : public node::AsyncResource {
}

private:
static const unsigned char kDispatchIdle = 0;
static const unsigned char kDispatchRunning = 1 << 0;
static const unsigned char kDispatchPending = 1 << 1;

static const unsigned int kMaxIterationCount = 1000;

// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
uv_idle_t idle;
size_t thread_count;
bool is_closing;
std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
Expand Down
4 changes: 2 additions & 2 deletions test/node-api/test_threadsafe_function/test.js
Expand Up @@ -43,7 +43,7 @@ function testWithJSMarshaller({
binding[threadStarter](function testCallback(value) {
array.push(value);
if (array.length === quitAfter) {
setImmediate(() => {
process.nextTick(() => {
binding.StopThread(common.mustCall(() => {
resolve(array);
}), !!abort);
Expand Down Expand Up @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.ARRAY_LENGTH) {
setImmediate(() => {
process.nextTick(() => {
binding.StopThread(common.mustCall(() => {
resolve();
}), false);
Expand Down

0 comments on commit 37b10a7

Please sign in to comment.