Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
n-api: guard against cond null dereference
A condition variable is only created by the thread-safe function if the
queue size is set to something larger than zero. This adds null-checks
around the condition variable and tests for the case where the queue
size is zero.

Fixes: nodejs/help#1387
PR-URL: #21871
Backport-PR-URL: #25002
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
  • Loading branch information
Gabriel Schulhof authored and rvagg committed Feb 28, 2019
1 parent c5a11dc commit 3675059
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 29 deletions.
10 changes: 7 additions & 3 deletions src/node_api.cc
Expand Up @@ -3683,7 +3683,7 @@ class TsFn: public node::AsyncResource {
if (thread_count == 0 || mode == napi_tsfn_abort) {
if (!is_closing) {
is_closing = (mode == napi_tsfn_abort);
if (is_closing) {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
Expand Down Expand Up @@ -3772,7 +3772,9 @@ class TsFn: public node::AsyncResource {
if (size == 0) {
if (thread_count == 0) {
is_closing = true;
cond->Signal(lock);
if (max_queue_size > 0) {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
} else {
if (uv_idle_stop(&idle) != 0) {
Expand Down Expand Up @@ -3839,7 +3841,9 @@ class TsFn: public node::AsyncResource {
if (set_closing) {
node::Mutex::ScopedLock lock(this->mutex);
is_closing = true;
cond->Signal(lock);
if (max_queue_size > 0) {
cond->Signal(lock);
}
}
if (handles_closing) {
return;
Expand Down
39 changes: 34 additions & 5 deletions test/addons-napi/test_threadsafe_function/binding.c
Expand Up @@ -9,6 +9,7 @@
#include "../common.h"

#define ARRAY_LENGTH 10
#define MAX_QUEUE_SIZE 2

static uv_thread_t uv_threads[2];
static napi_threadsafe_function ts_fn;
Expand All @@ -18,6 +19,7 @@ typedef struct {
napi_threadsafe_function_release_mode abort;
bool start_secondary;
napi_ref js_finalize_cb;
uint32_t max_queue_size;
} ts_fn_hint;

static ts_fn_hint ts_info;
Expand Down Expand Up @@ -71,6 +73,12 @@ static void data_source_thread(void* data) {
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
if (ts_fn_info->max_queue_size == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
for (; uv_hrtime() - start < 200000000;);
}
switch (status) {
case napi_queue_full:
queue_was_full = true;
Expand Down Expand Up @@ -167,8 +175,8 @@ static napi_value StartThreadInternal(napi_env env,
napi_callback_info info,
napi_threadsafe_function_call_js cb,
bool block_on_full) {
size_t argc = 3;
napi_value argv[3];
size_t argc = 4;
napi_value argv[4];

ts_info.block_on_full =
(block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
Expand All @@ -178,8 +186,18 @@ static napi_value StartThreadInternal(napi_env env,
napi_value async_name;
NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
NAPI_AUTO_LENGTH, &async_name));
NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name,
2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn));
NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
NAPI_CALL(env, napi_create_threadsafe_function(env,
argv[0],
NULL,
async_name,
ts_info.max_queue_size,
2,
uv_threads,
join_the_threads,
&ts_info,
cb,
&ts_fn));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
Expand Down Expand Up @@ -224,8 +242,9 @@ static napi_value Init(napi_env env, napi_value exports) {
for (index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
}
napi_value js_array_length;
napi_value js_array_length, js_max_queue_size;
napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);

napi_property_descriptor properties[] = {
{
Expand All @@ -238,6 +257,16 @@ static napi_value Init(napi_env env, napi_value exports) {
napi_enumerable,
NULL
},
{
"MAX_QUEUE_SIZE",
NULL,
NULL,
NULL,
NULL,
js_max_queue_size,
napi_enumerable,
NULL
},
DECLARE_NAPI_PROPERTY("StartThread", StartThread),
DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
Expand Down
84 changes: 63 additions & 21 deletions test/addons-napi/test_threadsafe_function/test.js
Expand Up @@ -25,7 +25,7 @@ if (process.argv[2] === 'child') {
if (callCount === 2) {
binding.Unref();
}
}, false /* abort */, true /* launchSecondary */);
}, false /* abort */, true /* launchSecondary */, +process.argv[3]);

// Release the thread-safe function from the main thread so that it may be
// torn down via the environment cleanup handler.
Expand All @@ -37,6 +37,7 @@ function testWithJSMarshaller({
threadStarter,
quitAfter,
abort,
maxQueueSize,
launchSecondary }) {
return new Promise((resolve) => {
const array = [];
Expand All @@ -49,7 +50,7 @@ function testWithJSMarshaller({
}), !!abort);
});
}
}, !!abort, !!launchSecondary);
}, !!abort, !!launchSecondary, maxQueueSize);
if (threadStarter === 'StartThreadNonblocking') {
// Let's make this thread really busy for a short while to ensure that
// the queue fills and the thread receives a napi_queue_full.
Expand All @@ -59,6 +60,24 @@ function testWithJSMarshaller({
});
}

function testUnref(queueSize) {
return new Promise((resolve, reject) => {
let output = '';
const child = fork(__filename, ['child', queueSize], {
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
});
child.on('close', (code) => {
if (code === 0) {
resolve(output.match(/\S+/g));
} else {
reject(new Error('Child process died with code ' + code));
}
});
child.stdout.on('data', (data) => (output += data.toString()));
})
.then((result) => assert.strictEqual(result.indexOf(0), -1));
}

new Promise(function testWithoutJSMarshaller(resolve) {
let callCount = 0;
binding.StartThreadNoNative(function testCallback() {
Expand All @@ -73,13 +92,23 @@ new Promise(function testWithoutJSMarshaller(resolve) {
}), false);
});
}
}, false /* abort */, false /* launchSecondary */);
}, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE);
})

// Start the thread in blocking mode, and assert that all values are passed.
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode with an infinite queue, and assert that all
// values are passed. Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: 0,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -88,6 +117,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -96,6 +126,16 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode with an infinite queue, and assert that all
// values are passed. Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
maxQueueSize: 0,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -104,6 +144,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.MAX_QUEUE_SIZE,
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -114,6 +155,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
Expand All @@ -124,15 +166,27 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start the thread in blocking mode, and assert that it could not finish.
// Quit early and aborting.
// Quit early by aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Start the thread in blocking mode with an infinite queue, and assert that it
// could not finish. Quit early by aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
maxQueueSize: 0,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))
Expand All @@ -142,25 +196,13 @@ new Promise(function testWithoutJSMarshaller(resolve) {
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
maxQueueSize: binding.MAX_QUEUE_SIZE,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Start a child process to test rapid teardown
.then(() => {
return new Promise((resolve, reject) => {
let output = '';
const child = fork(__filename, ['child'], {
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
});
child.on('close', (code) => {
if (code === 0) {
resolve(output.match(/\S+/g));
} else {
reject(new Error('Child process died with code ' + code));
}
});
child.stdout.on('data', (data) => (output += data.toString()));
});
})
.then((result) => assert.strictEqual(result.indexOf(0), -1));
.then(() => testUnref(binding.MAX_QUEUE_SIZE))

// Start a child process with an infinite queue to test rapid teardown
.then(() => testUnref(0));

0 comments on commit 3675059

Please sign in to comment.