diff --git a/benchmark/worker/messageport.js b/benchmark/worker/messageport.js index 8e2ddae73ff3ab..2f0d6f0621e8c8 100644 --- a/benchmark/worker/messageport.js +++ b/benchmark/worker/messageport.js @@ -4,6 +4,7 @@ const common = require('../common.js'); const { MessageChannel } = require('worker_threads'); const bench = common.createBenchmark(main, { payload: ['string', 'object'], + style: ['eventtarget', 'eventemitter'], n: [1e6] }); @@ -25,14 +26,26 @@ function main(conf) { const { port1, port2 } = new MessageChannel(); let messages = 0; - port2.onmessage = () => { + function listener() { if (messages++ === n) { bench.end(n); port1.close(); } else { write(); } - }; + } + + switch (conf.style) { + case 'eventtarget': + port2.onmessage = listener; + break; + case 'eventemitter': + port2.on('message', listener); + break; + default: + throw new Error('Unsupported listener type'); + } + bench.start(); write(); diff --git a/lib/internal/event_target.js b/lib/internal/event_target.js index 02ea2b8f0fff48..a55c9e461de458 100644 --- a/lib/internal/event_target.js +++ b/lib/internal/event_target.js @@ -31,8 +31,13 @@ const kEvents = Symbol('kEvents'); const kStop = Symbol('kStop'); const kTarget = Symbol('kTarget'); +const kHybridDispatch = Symbol.for('nodejs.internal.kHybridDispatch'); +const kCreateEvent = Symbol('kCreateEvent'); const kNewListener = Symbol('kNewListener'); const kRemoveListener = Symbol('kRemoveListener'); +const kIsNodeStyleListener = Symbol('kIsNodeStyleListener'); +const kMaxListeners = Symbol('kMaxListeners'); +const kMaxListenersWarned = Symbol('kMaxListenersWarned'); // Lazy load perf_hooks to avoid the additional overhead on startup let perf_hooks; @@ -157,7 +162,7 @@ Object.defineProperty(Event.prototype, SymbolToStringTag, { // the linked list makes dispatching faster, even if adding/removing is // slower. class Listener { - constructor(previous, listener, once, capture, passive) { + constructor(previous, listener, once, capture, passive, isNodeStyleListener) { this.next = undefined; if (previous !== undefined) previous.next = this; @@ -166,6 +171,7 @@ class Listener { this.once = once; this.capture = capture; this.passive = passive; + this.isNodeStyleListener = isNodeStyleListener; this.callback = typeof listener === 'function' ? @@ -185,13 +191,17 @@ class Listener { } } +function initEventTarget(self) { + self[kEvents] = new Map(); +} + class EventTarget { // Used in checking whether an object is an EventTarget. This is a well-known // symbol as EventTarget may be used cross-realm. See discussion in #33661. static [kIsEventTarget] = true; constructor() { - this[kEvents] = new Map(); + initEventTarget(this); } [kNewListener](size, type, listener, once, capture, passive) {} @@ -206,7 +216,8 @@ class EventTarget { const { once, capture, - passive + passive, + isNodeStyleListener } = validateEventListenerOptions(options); if (!shouldAddListener(listener)) { @@ -228,7 +239,7 @@ class EventTarget { if (root === undefined) { root = { size: 1, next: undefined }; // This is the first handler in our linked list. - new Listener(root, listener, once, capture, passive); + new Listener(root, listener, once, capture, passive, isNodeStyleListener); this[kNewListener](root.size, type, listener, once, capture, passive); this[kEvents].set(type, root); return; @@ -247,7 +258,8 @@ class EventTarget { return; } - new Listener(previous, listener, once, capture, passive); + new Listener(previous, listener, once, capture, passive, + isNodeStyleListener); root.size++; this[kNewListener](root.size, type, listener, once, capture, passive); } @@ -290,39 +302,61 @@ class EventTarget { if (event[kTarget] !== null) throw new ERR_EVENT_RECURSION(event.type); - const root = this[kEvents].get(event.type); + this[kHybridDispatch](event, event.type, event); + + return event.defaultPrevented !== true; + } + + [kHybridDispatch](nodeValue, type, event) { + const createEvent = () => { + if (event === undefined) { + event = this[kCreateEvent](nodeValue, type); + event[kTarget] = this; + } + return event; + }; + + const root = this[kEvents].get(type); if (root === undefined || root.next === undefined) return true; - event[kTarget] = this; + if (event !== undefined) + event[kTarget] = this; let handler = root.next; let next; while (handler !== undefined && - (handler.passive || event[kStop] !== true)) { + (handler.passive || event?.[kStop] !== true)) { // Cache the next item in case this iteration removes the current one next = handler.next; if (handler.once) { handler.remove(); root.size--; + const { listener, capture } = handler; + this[kRemoveListener](root.size, type, listener, capture); } try { - const result = handler.callback.call(this, event); + let arg; + if (handler.isNodeStyleListener) { + arg = nodeValue; + } else { + arg = createEvent(); + } + const result = handler.callback.call(this, arg); if (result !== undefined && result !== null) - addCatch(this, result, event); + addCatch(this, result, createEvent()); } catch (err) { - emitUnhandledRejectionOrErr(this, err, event); + emitUnhandledRejectionOrErr(this, err, createEvent()); } handler = next; } - event[kTarget] = undefined; - - return event.defaultPrevented !== true; + if (event !== undefined) + event[kTarget] = undefined; } [customInspectSymbol](depth, options) { @@ -350,15 +384,19 @@ Object.defineProperty(EventTarget.prototype, SymbolToStringTag, { value: 'EventTarget', }); -const kMaxListeners = Symbol('maxListeners'); -const kMaxListenersWarned = Symbol('maxListenersWarned'); +function initNodeEventTarget(self) { + initEventTarget(self); + // eslint-disable-next-line no-use-before-define + self[kMaxListeners] = NodeEventTarget.defaultMaxListeners; + self[kMaxListenersWarned] = false; +} + class NodeEventTarget extends EventTarget { static defaultMaxListeners = 10; constructor() { super(); - this[kMaxListeners] = NodeEventTarget.defaultMaxListeners; - this[kMaxListenersWarned] = false; + initNodeEventTarget(this); } [kNewListener](size, type, listener, once, capture, passive) { @@ -410,17 +448,18 @@ class NodeEventTarget extends EventTarget { } on(type, listener) { - this.addEventListener(type, listener); + this.addEventListener(type, listener, { [kIsNodeStyleListener]: true }); return this; } addListener(type, listener) { - this.addEventListener(type, listener); + this.addEventListener(type, listener, { [kIsNodeStyleListener]: true }); return this; } once(type, listener) { - this.addEventListener(type, listener, { once: true }); + this.addEventListener(type, listener, + { once: true, [kIsNodeStyleListener]: true }); return this; } @@ -470,6 +509,7 @@ function validateEventListenerOptions(options) { once: Boolean(options.once), capture: Boolean(options.capture), passive: Boolean(options.passive), + isNodeStyleListener: Boolean(options[kIsNodeStyleListener]) }; } @@ -520,4 +560,9 @@ module.exports = { EventTarget, NodeEventTarget, defineEventHandler, + initEventTarget, + initNodeEventTarget, + kCreateEvent, + kNewListener, + kRemoveListener, }; diff --git a/lib/internal/per_context/messageport.js b/lib/internal/per_context/messageport.js index ee08a596122a51..43587319b040de 100644 --- a/lib/internal/per_context/messageport.js +++ b/lib/internal/per_context/messageport.js @@ -1,12 +1,31 @@ 'use strict'; +const { + SymbolFor, +} = primordials; + class MessageEvent { - constructor(data, target) { + constructor(data, target, type) { this.data = data; this.target = target; + this.type = type; } } -exports.emitMessage = function(data) { - if (typeof this.onmessage === 'function') - this.onmessage(new MessageEvent(data, this)); +const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch'); + +exports.emitMessage = function(data, type) { + if (typeof this[kHybridDispatch] === 'function') { + this[kHybridDispatch](data, type, undefined); + return; + } + + const event = new MessageEvent(data, this, type); + if (type === 'message') { + if (typeof this.onmessage === 'function') + this.onmessage(event); + } else { + // eslint-disable-next-line no-lonely-if + if (typeof this.onmessageerror === 'function') + this.onmessageerror(event); + } }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 60dd8cd67d6186..5b5118a1d70b21 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -24,20 +24,23 @@ const { stopMessagePort } = internalBinding('messaging'); const { - threadId, getEnvMessagePort } = internalBinding('worker'); const { Readable, Writable } = require('stream'); -const EventEmitter = require('events'); +const { + Event, + NodeEventTarget, + defineEventHandler, + initNodeEventTarget, + kCreateEvent, + kNewListener, + kRemoveListener, +} = require('internal/event_target'); const { inspect } = require('internal/util/inspect'); -let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { - debug = fn; -}); const kIncrementsPortRef = Symbol('kIncrementsPortRef'); const kName = Symbol('kName'); -const kOnMessageListener = Symbol('kOnMessageListener'); const kPort = Symbol('kPort'); const kWaitingStreams = Symbol('kWaitingStreams'); const kWritableCallbacks = Symbol('kWritableCallbacks'); @@ -54,7 +57,7 @@ const messageTypes = { }; // We have to mess with the MessagePort prototype a bit, so that a) we can make -// it inherit from EventEmitter, even though it is a C++ class, and b) we do +// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do // not provide methods that are not present in the Browser and not documented // on our side (e.g. hasRef). // Save a copy of the original set of methods as a shallow clone. @@ -62,47 +65,39 @@ const MessagePortPrototype = ObjectCreate( ObjectGetPrototypeOf(MessagePort.prototype), ObjectGetOwnPropertyDescriptors(MessagePort.prototype)); // Set up the new inheritance chain. -ObjectSetPrototypeOf(MessagePort, EventEmitter); -ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype); +ObjectSetPrototypeOf(MessagePort, NodeEventTarget); +ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype); // Copy methods that are inherited from HandleWrap, because // changing the prototype of MessagePort.prototype implicitly removed them. MessagePort.prototype.ref = MessagePortPrototype.ref; MessagePort.prototype.unref = MessagePortPrototype.unref; -// A communication channel consisting of a handle (that wraps around an -// uv_async_t) which can receive information from other threads and emits -// .onmessage events, and a function used for sending data to a MessagePort -// in some other thread. -MessagePort.prototype[kOnMessageListener] = function onmessage(event) { - if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA) - debug(`[${threadId}] received message`, event); - // Emit the deserialized object to userland. - this.emit('message', event.data); -}; - -// This is for compatibility with the Web's MessagePort API. It makes sense to -// provide it as an `EventEmitter` in Node.js, but if somebody overrides -// `onmessage`, we'll switch over to the Web API model. -ObjectDefineProperty(MessagePort.prototype, 'onmessage', { - enumerable: true, - configurable: true, - get() { - return this[kOnMessageListener]; - }, - set(value) { - this[kOnMessageListener] = value; - if (typeof value === 'function') { - this.ref(); - MessagePortPrototype.start.call(this); - } else { - this.unref(); - stopMessagePort(this); - } +class MessageEvent extends Event { + constructor(data, target, type) { + super(type); + this.data = data; } -}); +} + +ObjectDefineProperty( + MessagePort.prototype, + kCreateEvent, + { + value: function(data, type) { + return new MessageEvent(data, this, type); + }, + configurable: false, + writable: false, + enumerable: false, + }); // This is called from inside the `MessagePort` constructor. function oninit() { + initNodeEventTarget(this); + // TODO(addaleax): This should be on MessagePort.prototype, but + // defineEventHandler() does not support that. + defineEventHandler(this, 'message'); + defineEventHandler(this, 'messageerror'); setupPortReferencing(this, this, 'message'); } @@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, { value: oninit }); +class MessagePortCloseEvent extends Event { + constructor() { + super('close'); + } +} + // This is called after the underlying `uv_async_t` has been closed. function onclose() { - this.emit('close'); + this.dispatchEvent(new MessagePortCloseEvent()); } ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, { @@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) { // If there are none or all are removed, unref() the channel so the worker // can shutdown gracefully. port.unref(); - eventEmitter.on('newListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + eventEmitter.on('newListener', function(name) { + if (name === eventName) newListener(eventEmitter.listenerCount(name)); + }); + eventEmitter.on('removeListener', function(name) { + if (name === eventName) removeListener(eventEmitter.listenerCount(name)); + }); + const origNewListener = eventEmitter[kNewListener]; + eventEmitter[kNewListener] = function(size, type, ...args) { + if (type === eventName) newListener(size - 1); + return origNewListener.call(this, size, type, ...args); + }; + const origRemoveListener = eventEmitter[kRemoveListener]; + eventEmitter[kRemoveListener] = function(size, type, ...args) { + if (type === eventName) removeListener(size); + return origRemoveListener.call(this, size, type, ...args); + }; + + function newListener(size) { + if (size === 0) { port.ref(); MessagePortPrototype.start.call(port); } - }); - eventEmitter.on('removeListener', (name) => { - if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + } + + function removeListener(size) { + if (size === 0) { stopMessagePort(port); port.unref(); } - }); + } } diff --git a/src/node_messaging.cc b/src/node_messaging.cc index add851e0836f3a..c615293570475c 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -746,6 +746,8 @@ void MessagePort::OnMessage() { Local payload; Local message_error; + Local argv[2]; + { // Catch any exceptions from parsing the message itself (not from // emitting it) as 'messageeror' events. @@ -764,16 +766,15 @@ void MessagePort::OnMessage() { continue; } - if (MakeCallback(emit_message, 1, &payload).IsEmpty()) { + argv[0] = payload; + argv[1] = env()->message_string(); + + if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) { reschedule: if (!message_error.IsEmpty()) { - // This should become a `messageerror` event in the sense of the - // EventTarget API at some point. - Local argv[] = { - env()->messageerror_string(), - message_error - }; - USE(MakeCallback(env()->emit_string(), arraysize(argv), argv)); + argv[0] = message_error; + argv[1] = env()->messageerror_string(); + USE(MakeCallback(emit_message, arraysize(argv), argv)); } // Re-schedule OnMessage() execution in case of failure. diff --git a/test/parallel/test-async-wrap-missing-method.js b/test/parallel/test-async-wrap-missing-method.js deleted file mode 100644 index 038a77fba65964..00000000000000 --- a/test/parallel/test-async-wrap-missing-method.js +++ /dev/null @@ -1,48 +0,0 @@ -'use strict'; -const common = require('../common'); -const assert = require('assert'); - -const { MessageChannel } = require('worker_threads'); - -{ - const { port1, port2 } = new MessageChannel(); - - // Returning a non-function in the getter should not crash. - Object.defineProperty(port1, 'onmessage', { - get() { - port1.unref(); - return 42; - } - }); - - port2.postMessage({ foo: 'bar' }); - - // We need to start the port manually because .onmessage assignment tracking - // has been overridden. - port1.start(); - port1.ref(); -} - -{ - const err = new Error('eyecatcher'); - process.on('uncaughtException', common.mustCall((exception) => { - port1.unref(); - assert.strictEqual(exception, err); - })); - - const { port1, port2 } = new MessageChannel(); - - // Throwing in the getter should not crash. - Object.defineProperty(port1, 'onmessage', { - get() { - throw err; - } - }); - - port2.postMessage({ foo: 'bar' }); - - // We need to start the port manually because .onmessage assignment tracking - // has been overridden. - port1.start(); - port1.ref(); -} diff --git a/test/parallel/test-crypto-key-objects-messageport.js b/test/parallel/test-crypto-key-objects-messageport.js index 1b910b571f77fc..f38e20da420aae 100644 --- a/test/parallel/test-crypto-key-objects-messageport.js +++ b/test/parallel/test-crypto-key-objects-messageport.js @@ -75,9 +75,9 @@ for (const [key, repr] of keys) { // TODO(addaleax): Switch this to a 'messageerror' event once MessagePort // implements EventTarget fully and in a cross-context manner. - port2moved.emit = common.mustCall((name, err) => { - assert.strictEqual(name, 'messageerror'); - assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); + port2moved.onmessageerror = common.mustCall((event) => { + assert.strictEqual(event.data.code, + 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); }); port2moved.start(); diff --git a/test/parallel/test-worker-message-port-inspect-during-init-hook.js b/test/parallel/test-worker-message-port-inspect-during-init-hook.js index 30b90710a604f1..8f9678de1e970e 100644 --- a/test/parallel/test-worker-message-port-inspect-during-init-hook.js +++ b/test/parallel/test-worker-message-port-inspect-during-init-hook.js @@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads'); async_hooks.createHook({ init: common.mustCall((id, type, triggerId, resource) => { - assert.strictEqual(util.inspect(resource), - 'MessagePort { active: true, refed: false }'); + assert.strictEqual( + util.inspect(resource), + 'MessagePort [EventTarget] { active: true, refed: false }'); }, 2) }).enable(); diff --git a/test/parallel/test-worker-message-port-transfer-filehandle.js b/test/parallel/test-worker-message-port-transfer-filehandle.js index e19e8c11a5ce78..2184ad575fa4a4 100644 --- a/test/parallel/test-worker-message-port-transfer-filehandle.js +++ b/test/parallel/test-worker-message-port-transfer-filehandle.js @@ -56,9 +56,9 @@ const { once } = require('events'); }); // TODO(addaleax): Switch this to a 'messageerror' event once MessagePort // implements EventTarget fully and in a cross-context manner. - port2moved.emit = common.mustCall((name, err) => { - assert.strictEqual(name, 'messageerror'); - assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); + port2moved.onmessageerror = common.mustCall((event) => { + assert.strictEqual(event.data.code, + 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE'); }); port2moved.start(); diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js index d128dc7edb25fd..4f4863c45ed516 100644 --- a/test/parallel/test-worker-message-port.js +++ b/test/parallel/test-worker-message-port.js @@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads'); assert.deepStrictEqual( Object.getOwnPropertyNames(MessagePort.prototype).sort(), [ - 'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start', + // TODO(addaleax): This should include onmessage (and eventually + // onmessageerror). + 'close', 'constructor', 'postMessage', 'ref', 'start', 'unref' ]); } diff --git a/test/parallel/test-worker-terminate-source-map.js b/test/parallel/test-worker-terminate-source-map.js index d88a4c68366865..8cd40a6607422c 100644 --- a/test/parallel/test-worker-terminate-source-map.js +++ b/test/parallel/test-worker-terminate-source-map.js @@ -32,9 +32,11 @@ Map.prototype.entries = increaseCallCount; Object.keys = increaseCallCount; Object.create = increaseCallCount; Object.hasOwnProperty = increaseCallCount; -Object.defineProperty(Object.prototype, 'value', { - get: increaseCallCount, - set: increaseCallCount -}); +for (const property of ['_cache', 'lineLengths', 'url']) { + Object.defineProperty(Object.prototype, property, { + get: increaseCallCount, + set: increaseCallCount + }); +} parentPort.postMessage('done');