Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: cherry-pick 7abc7e45b2 from node #29021

Merged
merged 1 commit into from May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions patches/node/.patches
Expand Up @@ -34,3 +34,4 @@ build_add_mjs_support_to_js2c.patch
src_inline_asynccleanuphookhandle_in_headers.patch
fix_handle_new_tostring_behavior_in_v8_serdes_test.patch
fix_the_--harmony-weak-refs_has_been_removed_remove_from_specs.patch
node-api_faster_threadsafe_function.patch
262 changes: 262 additions & 0 deletions patches/node/node-api_faster_threadsafe_function.patch
@@ -0,0 +1,262 @@
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Fedor Indutny <fedor@indutny.com>
Date: Sat, 1 May 2021 11:26:46 -0700
Subject: node-api: faster threadsafe_function

Invoke threadsafe_function during the same tick and avoid marshalling
costs between threads and/or churning event loop if either:

1. There's a queued call already
2. `Push()` is called while the main thread was running
threadsafe_function

PR-URL: https://github.com/nodejs/node/pull/38506
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>

diff --git a/src/node_api.cc b/src/node_api.cc
index f1a5265b6a7234dc754aedc86ecd3132f3d90b09..d1076b29aeb5133a0325d3e7ebd097d207e4f4a6 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -12,6 +12,7 @@
#include "tracing/traced_value.h"
#include "util-inl.h"

+#include <atomic>
#include <memory>

struct node_napi_env__ : public napi_env__ {
@@ -131,6 +132,7 @@ class ThreadSafeFunction : public node::AsyncResource {
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
+ dispatch_state(kDispatchIdle),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
@@ -170,10 +172,8 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
queue.push(data);
+ Send();
return napi_ok;
}
}
@@ -205,9 +205,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
+ Send();
}
}

@@ -232,7 +230,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond = std::make_unique<node::ConditionVariable>();
}
if (max_queue_size == 0 || cond) {
- CHECK_EQ(0, uv_idle_init(loop, &idle));
return napi_ok;
}

@@ -257,21 +254,46 @@ class ThreadSafeFunction : public node::AsyncResource {

napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

- void DispatchOne() {
+ inline void* Context() {
+ return context;
+ }
+
+ protected:
+ void Dispatch() {
+ bool has_more = true;
+
+ // Limit maximum synchronous iteration count to prevent event loop
+ // starvation. See `src/node_messaging.cc` for an inspiration.
+ unsigned int iterations_left = kMaxIterationCount;
+ while (has_more && --iterations_left != 0) {
+ dispatch_state = kDispatchRunning;
+ has_more = DispatchOne();
+
+ // Send() was called while we were executing the JS function
+ if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
+ has_more = true;
+ }
+ }
+
+ if (has_more) {
+ Send();
+ }
+ }
+
+ bool DispatchOne() {
void* data = nullptr;
bool popped_value = false;
+ bool has_more = false;

{
node::Mutex::ScopedLock lock(this->mutex);
@@ -296,9 +318,9 @@ class ThreadSafeFunction : public node::AsyncResource {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
- } else {
- CHECK_EQ(0, uv_idle_stop(&idle));
}
+ } else {
+ has_more = true;
}
}
}
@@ -316,6 +338,8 @@ class ThreadSafeFunction : public node::AsyncResource {
call_js_cb(env, js_callback, context, data);
});
}
+
+ return has_more;
}

void Finalize() {
@@ -329,10 +353,6 @@ class ThreadSafeFunction : public node::AsyncResource {
EmptyQueueAndDelete();
}

- inline void* Context() {
- return context;
- }
-
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
v8::HandleScope scope(env->isolate);
if (set_closing) {
@@ -352,18 +372,20 @@ class ThreadSafeFunction : public node::AsyncResource {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async,
reinterpret_cast<uv_async_t*>(handle));
- v8::HandleScope scope(ts_fn->env->isolate);
- ts_fn->env->node_env()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
- [](uv_handle_t* handle) -> void {
- ThreadSafeFunction* ts_fn =
- node::ContainerOf(&ThreadSafeFunction::idle,
- reinterpret_cast<uv_idle_t*>(handle));
- ts_fn->Finalize();
- });
+ ts_fn->Finalize();
});
}

+ void Send() {
+ // Ask currently running Dispatch() to make one more iteration
+ unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
+ if ((current_state & kDispatchRunning) == kDispatchRunning) {
+ return;
+ }
+
+ CHECK_EQ(0, uv_async_send(&async));
+ }
+
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
@@ -387,16 +409,10 @@ class ThreadSafeFunction : public node::AsyncResource {
}
}

- static void IdleCb(uv_idle_t* idle) {
- ThreadSafeFunction* ts_fn =
- node::ContainerOf(&ThreadSafeFunction::idle, idle);
- ts_fn->DispatchOne();
- }
-
static void AsyncCb(uv_async_t* async) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async, async);
- CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
+ ts_fn->Dispatch();
}

static void Cleanup(void* data) {
@@ -405,14 +421,20 @@ class ThreadSafeFunction : public node::AsyncResource {
}

private:
+ static const unsigned char kDispatchIdle = 0;
+ static const unsigned char kDispatchRunning = 1 << 0;
+ static const unsigned char kDispatchPending = 1 << 1;
+
+ static const unsigned int kMaxIterationCount = 1000;
+
// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
- uv_idle_t idle;
size_t thread_count;
bool is_closing;
+ std::atomic_uchar dispatch_state;

// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c
index c9c526153804c60b5bd5844d2c2aacac7f0fb514..ae3ec67de43cfffdfb10772761dbd69f01c623bb 100644
--- a/test/node-api/test_threadsafe_function/binding.c
+++ b/test/node-api/test_threadsafe_function/binding.c
@@ -7,7 +7,7 @@
#include <node_api.h>
#include "../../js-native-api/common.h"

-#define ARRAY_LENGTH 10
+#define ARRAY_LENGTH 10000
#define MAX_QUEUE_SIZE 2

static uv_thread_t uv_threads[2];
@@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
- if (ts_fn_info->max_queue_size == 0) {
+ if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js
index 3603d79ee6b5d36590503989d8168368eaf12b03..ccd3f4228a793ae77eff760309e31191ba8de49a 100644
--- a/test/node-api/test_threadsafe_function/test.js
+++ b/test/node-api/test_threadsafe_function/test.js
@@ -210,6 +210,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

+// Make sure that threadsafe function isn't stalled when we hit
+// `kMaxIterationCount` in `src/node_api.cc`
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThreadNonblocking',
+ maxQueueSize: binding.ARRAY_LENGTH >>> 1,
+ quitAfter: binding.ARRAY_LENGTH
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
// Start a child process to test rapid teardown
.then(() => testUnref(binding.MAX_QUEUE_SIZE))