Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node-api: faster threadsafe_function #38506

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/node_api.cc
Expand Up @@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"

#include <atomic>
#include <memory>

struct node_napi_env__ : public napi_env__ {
Expand Down Expand Up @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
Expand Down Expand Up @@ -176,7 +178,7 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
queue.push(data);
Expand Down Expand Up @@ -211,7 +213,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
if (Send() != 0) {
return napi_generic_failure;
}
}
Expand Down Expand Up @@ -275,9 +277,32 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_ok;
}

void DispatchOne() {
inline void* Context() {
return context;
}

protected:
void Dispatch() {
bool has_more = true;

// Limit maximum synchronous iteration count to prevent event loop
// starvation. See `src/node_messaging.cc` for an inspiration.
unsigned int iterations_left = kMaxIterationCount;
while (has_more && --iterations_left != 0) {
dispatch_state = kDispatchRunning;
has_more = DispatchOne();

// Send() was called while we were executing the JS function
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
addaleax marked this conversation as resolved.
Show resolved Hide resolved
has_more = true;
}
}
}

bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
bool has_more = false;

{
node::Mutex::ScopedLock lock(this->mutex);
Expand Down Expand Up @@ -305,6 +330,8 @@ class ThreadSafeFunction : public node::AsyncResource {
} else {
CHECK_EQ(0, uv_idle_stop(&idle));
}
} else {
has_more = true;
}
}
}
Expand All @@ -322,6 +349,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}

return has_more;
}

void Finalize() {
Expand All @@ -335,10 +364,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}

inline void* Context() {
return context;
}

void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
Expand Down Expand Up @@ -370,6 +395,16 @@ class ThreadSafeFunction : public node::AsyncResource {
});
}

int Send() {
// Ask currently running Dispatch() to make one more iteration
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return 0;
}

return uv_async_send(&async);
}

// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
Expand All @@ -396,7 +431,7 @@ class ThreadSafeFunction : public node::AsyncResource {
static void IdleCb(uv_idle_t* idle) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle, idle);
ts_fn->DispatchOne();
ts_fn->Dispatch();
}

static void AsyncCb(uv_async_t* async) {
Expand All @@ -411,6 +446,12 @@ class ThreadSafeFunction : public node::AsyncResource {
}

private:
static const unsigned char kDispatchIdle = 0;
static const unsigned char kDispatchRunning = 1 << 0;
static const unsigned char kDispatchPending = 1 << 1;

static const unsigned int kMaxIterationCount = 1000;

// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
Expand All @@ -419,6 +460,7 @@ class ThreadSafeFunction : public node::AsyncResource {
uv_idle_t idle;
size_t thread_count;
bool is_closing;
std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
Expand Down
4 changes: 2 additions & 2 deletions test/node-api/test_threadsafe_function/test.js
Expand Up @@ -43,7 +43,7 @@ function testWithJSMarshaller({
binding[threadStarter](function testCallback(value) {
array.push(value);
if (array.length === quitAfter) {
setImmediate(() => {
process.nextTick(() => {
jasnell marked this conversation as resolved.
Show resolved Hide resolved
binding.StopThread(common.mustCall(() => {
resolve(array);
}), !!abort);
Expand Down Expand Up @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.ARRAY_LENGTH) {
setImmediate(() => {
process.nextTick(() => {
binding.StopThread(common.mustCall(() => {
resolve();
}), false);
Expand Down