Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node-api: faster threadsafe_function #38506

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 55 additions & 33 deletions src/node_api.cc
Expand Up @@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"

#include <atomic>
#include <memory>

struct node_napi_env__ : public napi_env__ {
Expand Down Expand Up @@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
Expand Down Expand Up @@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
if (uv_async_send(&async) != 0) {
return napi_generic_failure;
}
queue.push(data);
Send();
Copy link
Member Author

@indutny indutny May 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've swapped the order since uv_async_send() error is treated as a hard failure and it is more logical to push before the notification.

return napi_ok;
}
}
Expand Down Expand Up @@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
return napi_generic_failure;
}
Send();
}
}

Expand All @@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond = std::make_unique<node::ConditionVariable>();
}
if (max_queue_size == 0 || cond) {
CHECK_EQ(0, uv_idle_init(loop, &idle));
return napi_ok;
}

Expand All @@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {

napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

void DispatchOne() {
inline void* Context() {
return context;
}

protected:
void Dispatch() {
bool has_more = true;

// Limit maximum synchronous iteration count to prevent event loop
// starvation. See `src/node_messaging.cc` for an inspiration.
unsigned int iterations_left = kMaxIterationCount;
while (has_more && --iterations_left != 0) {
dispatch_state = kDispatchRunning;
has_more = DispatchOne();

// Send() was called while we were executing the JS function
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
addaleax marked this conversation as resolved.
Show resolved Hide resolved
has_more = true;
}
}

if (has_more) {
Send();
}
}

bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
bool has_more = false;

{
node::Mutex::ScopedLock lock(this->mutex);
Expand All @@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
} else {
CHECK_EQ(0, uv_idle_stop(&idle));
}
} else {
has_more = true;
}
}
}
Expand All @@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}

return has_more;
}

void Finalize() {
Expand All @@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}

inline void* Context() {
return context;
}

void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
Expand All @@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async,
reinterpret_cast<uv_async_t*>(handle));
v8::HandleScope scope(ts_fn->env->isolate);
ts_fn->env->node_env()->CloseHandle(
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
[](uv_handle_t* handle) -> void {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle,
reinterpret_cast<uv_idle_t*>(handle));
ts_fn->Finalize();
});
ts_fn->Finalize();
});
}

void Send() {
// Ask currently running Dispatch() to make one more iteration
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return;
}

CHECK_EQ(0, uv_async_send(&async));
}

// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
Expand All @@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
}
}

static void IdleCb(uv_idle_t* idle) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle, idle);
ts_fn->DispatchOne();
}

static void AsyncCb(uv_async_t* async) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async, async);
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
ts_fn->Dispatch();
}

static void Cleanup(void* data) {
Expand All @@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
}

private:
static const unsigned char kDispatchIdle = 0;
static const unsigned char kDispatchRunning = 1 << 0;
static const unsigned char kDispatchPending = 1 << 1;

static const unsigned int kMaxIterationCount = 1000;

// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
uv_idle_t idle;
size_t thread_count;
bool is_closing;
std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
Expand Down
4 changes: 2 additions & 2 deletions test/node-api/test_threadsafe_function/binding.c
Expand Up @@ -7,7 +7,7 @@
#include <node_api.h>
#include "../../js-native-api/common.h"

#define ARRAY_LENGTH 10
#define ARRAY_LENGTH 10000
#define MAX_QUEUE_SIZE 2

static uv_thread_t uv_threads[2];
Expand Down Expand Up @@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
if (ts_fn_info->max_queue_size == 0) {
if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
Expand Down
9 changes: 9 additions & 0 deletions test/node-api/test_threadsafe_function/test.js
Expand Up @@ -211,6 +211,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Make sure that threadsafe function isn't stalled when we hit
// `kMaxIterationCount` in `src/node_api.cc`
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.ARRAY_LENGTH >>> 1,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks stupid, but there is an assert in the binding that the queue block at least once, and I didn't want to change .c code 😂

quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start a child process to test rapid teardown
.then(() => testUnref(binding.MAX_QUEUE_SIZE))

Expand Down