Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

n-api: guard against cond null dereference #21871

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/node_api.cc
Expand Up @@ -3782,7 +3782,7 @@ class TsFn: public node::AsyncResource {
if (thread_count == 0 || mode == napi_tsfn_abort) {
if (!is_closing) {
is_closing = (mode == napi_tsfn_abort);
if (is_closing) {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
Expand Down Expand Up @@ -3872,7 +3872,9 @@ class TsFn: public node::AsyncResource {
if (size == 0) {
if (thread_count == 0) {
is_closing = true;
cond->Signal(lock);
if (max_queue_size > 0) {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
} else {
if (uv_idle_stop(&idle) != 0) {
Expand Down Expand Up @@ -3939,7 +3941,9 @@ class TsFn: public node::AsyncResource {
if (set_closing) {
node::Mutex::ScopedLock lock(this->mutex);
is_closing = true;
cond->Signal(lock);
if (max_queue_size > 0) {
cond->Signal(lock);
}
}
if (handles_closing) {
return;
Expand Down
39 changes: 34 additions & 5 deletions test/addons-napi/test_threadsafe_function/binding.c
Expand Up @@ -9,6 +9,7 @@
#include "../common.h"

#define ARRAY_LENGTH 10
#define MAX_QUEUE_SIZE 2

static uv_thread_t uv_threads[2];
static napi_threadsafe_function ts_fn;
Expand All @@ -18,6 +19,7 @@ typedef struct {
napi_threadsafe_function_release_mode abort;
bool start_secondary;
napi_ref js_finalize_cb;
uint32_t max_queue_size;
} ts_fn_hint;

static ts_fn_hint ts_info;
Expand Down Expand Up @@ -71,6 +73,12 @@ static void data_source_thread(void* data) {
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
if (ts_fn_info->max_queue_size == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
for (; uv_hrtime() - start < 200000000;);
}
switch (status) {
case napi_queue_full:
queue_was_full = true;
Expand Down Expand Up @@ -167,8 +175,8 @@ static napi_value StartThreadInternal(napi_env env,
napi_callback_info info,
napi_threadsafe_function_call_js cb,
bool block_on_full) {
size_t argc = 3;
napi_value argv[3];
size_t argc = 4;
napi_value argv[4];

ts_info.block_on_full =
(block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
Expand All @@ -178,8 +186,18 @@ static napi_value StartThreadInternal(napi_env env,
napi_value async_name;
NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
NAPI_AUTO_LENGTH, &async_name));
NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name,
2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn));
NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
NAPI_CALL(env, napi_create_threadsafe_function(env,
argv[0],
NULL,
async_name,
ts_info.max_queue_size,
2,
uv_threads,
join_the_threads,
&ts_info,
cb,
&ts_fn));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
Expand Down Expand Up @@ -224,8 +242,9 @@ static napi_value Init(napi_env env, napi_value exports) {
for (index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
}
napi_value js_array_length;
napi_value js_array_length, js_max_queue_size;
napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);

napi_property_descriptor properties[] = {
{
Expand All @@ -238,6 +257,16 @@ static napi_value Init(napi_env env, napi_value exports) {
napi_enumerable,
NULL
},
{
"MAX_QUEUE_SIZE",
NULL,
NULL,
NULL,
NULL,
js_max_queue_size,
napi_enumerable,
NULL
},
DECLARE_NAPI_PROPERTY("StartThread", StartThread),
DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
Expand Down
84 changes: 63 additions & 21 deletions test/addons-napi/test_threadsafe_function/test.js
Expand Up @@ -25,7 +25,7 @@ if (process.argv[2] === 'child') {
if (callCount === 2) {
binding.Unref();
}
}, false /* abort */, true /* launchSecondary */);
}, false /* abort */, true /* launchSecondary */, +process.argv[3]);

// Release the thread-safe function from the main thread so that it may be
// torn down via the environment cleanup handler.
Expand All @@ -37,6 +37,7 @@ function testWithJSMarshaller({
threadStarter,
quitAfter,
abort,
maxQueueSize,
launchSecondary }) {
return new Promise((resolve) => {
const array = [];
Expand All @@ -49,7 +50,7 @@ function testWithJSMarshaller({
}), !!abort);
});
}
}, !!abort, !!launchSecondary);
}, !!abort, !!launchSecondary, maxQueueSize);
if (threadStarter === 'StartThreadNonblocking') {
// Let's make this thread really busy for a short while to ensure that
// the queue fills and the thread receives a napi_queue_full.
Expand All @@ -59,6 +60,24 @@ function testWithJSMarshaller({
});
}

function testUnref(queueSize) {
return new Promise((resolve, reject) => {
let output = '';
const child = fork(__filename, ['child', queueSize], {
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
});
child.on('close', (code) => {
if (code === 0) {
resolve(output.match(/\S+/g));
} else {
reject(new Error('Child process died with code ' + code));
}
});
child.stdout.on('data', (data) => (output += data.toString()));
})
.then((result) => assert.strictEqual(result.indexOf(0), -1));
}

new Promise(function testWithoutJSMarshaller(resolve) {
let callCount = 0;
binding.StartThreadNoNative(function testCallback() {
Expand All @@ -73,13 +92,23 @@ new Promise(function testWithoutJSMarshaller(resolve) {
}), false);
});
}
}, false /* abort */, false /* launchSecondary */);
}, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE);
})

// Start the thread in blocking mode, and assert that all values are passed.
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode with an infinite queue, and assert that all
// values are passed. Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: 0,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -88,6 +117,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -96,6 +126,16 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode with an infinite queue, and assert that all
// values are passed. Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: 0,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -104,6 +144,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -114,6 +155,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -124,15 +166,27 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode, and assert that it could not finish.
// Quit early and aborting.
// Quit early by aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Start the thread in blocking mode with an infinite queue, and assert that it
// could not finish. Quit early by aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: 0,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))
Expand All @@ -142,25 +196,13 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Start a child process to test rapid teardown
.then(() => {
return new Promise((resolve, reject) => {
let output = '';
const child = fork(__filename, ['child'], {
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
});
child.on('close', (code) => {
if (code === 0) {
resolve(output.match(/\S+/g));
} else {
reject(new Error('Child process died with code ' + code));
}
});
child.stdout.on('data', (data) => (output += data.toString()));
});
})
.then((result) => assert.strictEqual(result.indexOf(0), -1));
.then(() => testUnref(binding.MAX_QUEUE_SIZE))

// Start a child process with an infinite queue to test rapid teardown
.then(() => testUnref(0));