Skip to content

Commit

Permalink
worker: emit 'messagerror' events for failed deserialization
Browse files Browse the repository at this point in the history
This is much nicer than just treating exceptions as uncaught, and
enables reporting of exceptions from the internal C++ deserialization
machinery.

PR-URL: #33772
Backport-PR-URL: #33965
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
addaleax authored and codebytere committed Jun 27, 2020
1 parent 5b1fd10 commit 4a37180
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 6 deletions.
12 changes: 12 additions & 0 deletions doc/api/errors.md
Expand Up @@ -1561,6 +1561,17 @@ behavior. See the documentation for [policy][] manifests for more information.
An attempt was made to allocate memory (usually in the C++ layer) but it
failed.

<a id="ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE"></a>
### `ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE`
<!-- YAML
added: REPLACEME
-->

A message posted to a [`MessagePort`][] could not be deserialized in the target
[vm][] `Context`. Not all Node.js objects can be successfully instantiated in
any context at this time, and attempting to transfer them using `postMessage()`
can fail on the receiving side in that case.

<a id="ERR_METHOD_NOT_IMPLEMENTED"></a>
### `ERR_METHOD_NOT_IMPLEMENTED`

Expand Down Expand Up @@ -2557,6 +2568,7 @@ such as `process.stdout.on('data')`.
[`Class: assert.AssertionError`]: assert.html#assert_class_assert_assertionerror
[`ERR_INVALID_ARG_TYPE`]: #ERR_INVALID_ARG_TYPE
[`EventEmitter`]: events.html#events_class_eventemitter
[`MessagePort`]: worker_threads.html#worker_threads_class_messageport
[`Object.getPrototypeOf`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/getPrototypeOf
[`Object.setPrototypeOf`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/setPrototypeOf
[`REPL`]: repl.html
Expand Down
18 changes: 18 additions & 0 deletions doc/api/worker_threads.md
Expand Up @@ -303,6 +303,15 @@ input of [`port.postMessage()`][].
Listeners on this event will receive a clone of the `value` parameter as passed
to `postMessage()` and no further arguments.

### Event: `'messageerror'`
<!-- YAML
added: REPLACEME
-->

* `error` {Error} An Error object

The `'messageerror'` event is emitted when deserializing a message failed.

### `port.close()`
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -677,6 +686,15 @@ See the [`port.on('message')`][] event for more details.
All messages sent from the worker thread will be emitted before the
[`'exit'` event][] is emitted on the `Worker` object.

### Event: `'messageerror'`
<!-- YAML
added: REPLACEME
-->

* `error` {Error} An Error object

The `'messageerror'` event is emitted when deserializing a message failed.

### Event: `'online'`
<!-- YAML
added: v10.5.0
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/worker.js
Expand Up @@ -190,7 +190,9 @@ class Worker extends EventEmitter {
transferList.push(...options.transferList);

this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
for (const event of ['message', 'messageerror']) {
this[kPublicPort].on(event, (message) => this.emit(event, message));
}
setupPortReferencing(this[kPublicPort], this, 'message');
this[kPort].postMessage({
argv,
Expand Down
2 changes: 2 additions & 0 deletions src/env.h
Expand Up @@ -223,6 +223,7 @@ constexpr size_t kFsStatsBufferLength =
V(done_string, "done") \
V(duration_string, "duration") \
V(ecdh_string, "ECDH") \
V(emit_string, "emit") \
V(emit_warning_string, "emitWarning") \
V(empty_object_string, "{}") \
V(encoding_string, "encoding") \
Expand Down Expand Up @@ -279,6 +280,7 @@ constexpr size_t kFsStatsBufferLength =
V(message_port_constructor_string, "MessagePort") \
V(message_port_string, "messagePort") \
V(message_string, "message") \
V(messageerror_string, "messageerror") \
V(minttl_string, "minttl") \
V(module_string, "module") \
V(modulus_string, "modulus") \
Expand Down
4 changes: 4 additions & 0 deletions src/node_errors.h
Expand Up @@ -41,6 +41,7 @@ void OnFatalError(const char* location, const char* message);
V(ERR_INVALID_ARG_TYPE, TypeError) \
V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \
V(ERR_MEMORY_ALLOCATION_FAILED, Error) \
V(ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE, Error) \
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_PASSPHRASE, TypeError) \
Expand Down Expand Up @@ -92,6 +93,9 @@ void OnFatalError(const char* location, const char* message);
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_OSSL_EVP_INVALID_DIGEST, "Invalid digest used") \
V(ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE, \
"A message object could not be deserialized successfully in the target " \
"vm.Context") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"Object that needs transfer was found in message but not listed " \
"in transferList") \
Expand Down
25 changes: 22 additions & 3 deletions src/node_messaging.cc
Expand Up @@ -742,7 +742,17 @@ void MessagePort::OnMessage() {
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);

Local<Value> payload;
if (!ReceiveMessage(context, true).ToLocal(&payload)) goto reschedule;
Local<Value> message_error;
{
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
TryCatchScope try_catch(env());
if (!ReceiveMessage(context, true).ToLocal(&payload)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
message_error = try_catch.Exception();
goto reschedule;
}
}
if (payload == env()->no_message_symbol()) break;

if (!env()->can_call_into_js()) {
Expand All @@ -753,6 +763,16 @@ void MessagePort::OnMessage() {

if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
reschedule:
if (!message_error.IsEmpty()) {
// This should become a `messageerror` event in the sense of the
// EventTarget API at some point.
Local<Value> argv[] = {
env()->messageerror_string(),
message_error
};
USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
}

// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
Expand Down Expand Up @@ -1215,8 +1235,7 @@ BaseObjectPtr<BaseObject> JSTransferable::Data::Deserialize(
// the end of the stream, after the main message has been read.

if (context != env->context()) {
// It would be nice to throw some kind of exception here, but how do we
// pass that to end users? For now, just drop the message silently.
THROW_ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE(env);
return {};
}
HandleScope handle_scope(env->isolate());
Expand Down
Expand Up @@ -26,7 +26,7 @@ const { once } = require('events');
port1.postMessage(fh, [ fh ]);
port2.on('message', common.mustNotCall());

const [ exception ] = await once(process, 'uncaughtException');
const [ exception ] = await once(port2, 'messageerror');

assert.strictEqual(exception.message, 'Unknown deserialize spec net:Socket');
port2.close();
Expand Down
Expand Up @@ -30,7 +30,7 @@ module.exports = {
port1.postMessage(fh, [ fh ]);
port2.on('message', common.mustNotCall());

const [ exception ] = await once(process, 'uncaughtException');
const [ exception ] = await once(port2, 'messageerror');

assert.match(exception.message, /Missing internal module/);
port2.close();
Expand Down
6 changes: 6 additions & 0 deletions test/parallel/test-worker-message-port-transfer-filehandle.js
Expand Up @@ -55,6 +55,12 @@ const { once } = require('events');
assert.strictEqual(msgEvent.data, 'second message');
port1.close();
});
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
// implements EventTarget fully and in a cross-context manner.
port2moved.emit = common.mustCall((name, err) => {
assert.strictEqual(name, 'messageerror');
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
});
port2moved.start();

assert.notStrictEqual(fh.fd, -1);
Expand Down

0 comments on commit 4a37180

Please sign in to comment.