From fe216079016585bd45bbfa98153ca36b5e392a69 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 22 Dec 2021 16:56:57 -0800 Subject: [PATCH] events: add EventEmitterAsyncResource to core MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signd-off-by: James M Snell PR-URL: https://github.com/nodejs/node/pull/41246 Reviewed-By: Gerhard Stöbich Reviewed-By: Anna Henningsen Reviewed-By: Antoine du Hamel --- doc/api/events.md | 85 +++++++++++ lib/events.js | 130 +++++++++++++++++ .../test-eventemitter-asyncresource.js | 132 ++++++++++++++++++ 3 files changed, 347 insertions(+) create mode 100644 test/parallel/test-eventemitter-asyncresource.js diff --git a/doc/api/events.md b/doc/api/events.md index f3ef831a4843d0..71846ea4527edf 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -1166,6 +1166,89 @@ const emitter = new EventEmitter(); setMaxListeners(5, target, emitter); ``` +## Class: `events.EventEmitterAsyncResource extends EventEmitter` + + + +Integrates `EventEmitter` with {AsyncResource} for `EventEmitter`s that +require manual async tracking. Specifically, all events emitted by instances +of `events.EventEmitterAsyncResource` will run within its [async context][]. + +```js +const { EventEmitterAsyncResource } = require('events'); +const { notStrictEqual, strictEqual } = require('assert'); +const { executionAsyncId } = require('async_hooks'); + +// Async tracking tooling will identify this as 'Q'. +const ee1 = new EventEmitterAsyncResource({ name: 'Q' }); + +// 'foo' listeners will run in the EventEmitters async context. +ee1.on('foo', () => { + strictEqual(executionAsyncId(), ee1.asyncId); + strictEqual(triggerAsyncId(), ee1.triggerAsyncId); +}); + +const ee2 = new EventEmitter(); + +// 'foo' listeners on ordinary EventEmitters that do not track async +// context, however, run in the same async context as the emit(). +ee2.on('foo', () => { + notStrictEqual(executionAsyncId(), ee2.asyncId); + notStrictEqual(triggerAsyncId(), ee2.triggerAsyncId); +}); + +Promise.resolve().then(() => { + ee1.emit('foo'); + ee2.emit('foo'); +}); +``` + +The `EventEmitterAsyncResource` class has the same methods and takes the +same options as `EventEmitter` and `AsyncResource` themselves. + +### `new events.EventEmitterAsyncResource(options)` + +* `options` {Object} + * `captureRejections` {boolean} It enables + [automatic capturing of promise rejection][capturerejections]. + **Default:** `false`. + * `name` {string} The type of async event. **Default::** + [`new.target.name`][]. + * `triggerAsyncId` {number} The ID of the execution context that created this + async event. **Default:** `executionAsyncId()`. + * `requireManualDestroy` {boolean} If set to `true`, disables `emitDestroy` + when the object is garbage collected. This usually does not need to be set + (even if `emitDestroy` is called manually), unless the resource's `asyncId` + is retrieved and the sensitive API's `emitDestroy` is called with it. + When set to `false`, the `emitDestroy` call on garbage collection + will only take place if there is at least one active `destroy` hook. + **Default:** `false`. + +### `eventemitterasyncresource.asyncId` + +* Type: {number} The unique `asyncId` assigned to the resource. + +### `eventemitterasyncresource.asyncResource` + +* Type: The underlying {AsyncResource}. + +The returned `AsyncResource` object has an additional `eventEmitter` property +that provides a reference to this `EventEmitterAsyncResource`. + +### `eventemitterasyncresource.emitDestroy()` + +Call all `destroy` hooks. This should only ever be called once. An error will +be thrown if it is called more than once. This **must** be manually called. If +the resource is left to be collected by the GC then the `destroy` hooks will +never be called. + +### `eventemitterasyncresource.triggerAsyncId` + +* Type: {number} The same `triggerAsyncId` that is passed to the + `AsyncResource` constructor. + ## `EventTarget` and `Event` API @@ -1706,7 +1789,9 @@ to the `EventTarget`. [`events.defaultMaxListeners`]: #eventsdefaultmaxlisteners [`fs.ReadStream`]: fs.md#class-fsreadstream [`net.Server`]: net.md#class-netserver +[`new.target.name`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/new.target [`process.on('warning')`]: process.md#event-warning +[async context]: async_context.md [capturerejections]: #capture-rejections-of-promises [error]: #error-events [rejection]: #emittersymbolfornodejsrejectionerr-eventname-args diff --git a/lib/events.js b/lib/events.js index 6f5a499083027a..48a080d3460eed 100644 --- a/lib/events.js +++ b/lib/events.js @@ -27,6 +27,7 @@ const { ArrayPrototypeShift, ArrayPrototypeSlice, ArrayPrototypeSplice, + ArrayPrototypeUnshift, Boolean, Error, ErrorCaptureStackTrace, @@ -42,6 +43,7 @@ const { Promise, PromiseReject, PromiseResolve, + ReflectApply, ReflectOwnKeys, String, StringPrototypeSplit, @@ -59,6 +61,7 @@ const { kEnhanceStackBeforeInspector, codes: { ERR_INVALID_ARG_TYPE, + ERR_INVALID_THIS, ERR_OUT_OF_RANGE, ERR_UNHANDLED_ERROR }, @@ -68,6 +71,7 @@ const { validateAbortSignal, validateBoolean, validateFunction, + validateString, } = require('internal/validators'); const kCapture = Symbol('kCapture'); @@ -76,6 +80,125 @@ const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners'); const kMaxEventTargetListenersWarned = Symbol('events.maxEventTargetListenersWarned'); +let EventEmitterAsyncResource; +// The EventEmitterAsyncResource has to be initialized lazily because event.js +// is loaded so early in the bootstrap process, before async_hooks is available. +// +// This implementation was adapted straight from addaleax's +// eventemitter-asyncresource MIT-licensed userland module. +// https://github.com/addaleax/eventemitter-asyncresource +function lazyEventEmitterAsyncResource() { + if (EventEmitterAsyncResource === undefined) { + const { + AsyncResource + } = require('async_hooks'); + + const kEventEmitter = Symbol('kEventEmitter'); + const kAsyncResource = Symbol('kAsyncResource'); + class EventEmitterReferencingAsyncResource extends AsyncResource { + /** + * @param {EventEmitter} ee + * @param {string} [type] + * @param {{ + * triggerAsyncId?: number, + * requireManualDestroy?: boolean, + * }} [options] + */ + constructor(ee, type, options) { + super(type, options); + this[kEventEmitter] = ee; + } + + /** + * @type {EventEmitter} + */ + get eventEmitter() { + if (this[kEventEmitter] === undefined) + throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource'); + return this[kEventEmitter]; + } + } + + EventEmitterAsyncResource = + class EventEmitterAsyncResource extends EventEmitter { + /** + * @param {{ + * name?: string, + * triggerAsyncId?: number, + * requireManualDestroy?: boolean, + * }} [options] + */ + constructor(options = undefined) { + let name; + if (typeof options === 'string') { + name = options; + options = undefined; + } else { + if (new.target === EventEmitterAsyncResource) { + validateString(options?.name, 'options.name'); + } + name = options?.name || new.target.name; + } + super(options); + + this[kAsyncResource] = + new EventEmitterReferencingAsyncResource(this, name, options); + } + + /** + * @param {symbol,string} event + * @param {...any} args + * @returns {boolean} + */ + emit(event, ...args) { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + const { asyncResource } = this; + ArrayPrototypeUnshift(args, super.emit, this, event); + return ReflectApply(asyncResource.runInAsyncScope, asyncResource, + args); + } + + /** + * @returns {void} + */ + emitDestroy() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + this.asyncResource.emitDestroy(); + } + + /** + * @type {number} + */ + get asyncId() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this.asyncResource.asyncId(); + } + + /** + * @type {number} + */ + get triggerAsyncId() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this.asyncResource.triggerAsyncId(); + } + + /** + * @type {EventEmitterReferencingAsyncResource} + */ + get asyncResource() { + if (this[kAsyncResource] === undefined) + throw new ERR_INVALID_THIS('EventEmitterAsyncResource'); + return this[kAsyncResource]; + } + }; + } + return EventEmitterAsyncResource; +} + /** * Creates a new `EventEmitter` instance. * @param {{ captureRejections?: boolean; }} [opts] @@ -106,6 +229,13 @@ ObjectDefineProperty(EventEmitter, 'captureRejections', { enumerable: true }); +ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', { + enumerable: true, + get: lazyEventEmitterAsyncResource, + set: undefined, + configurable: true, +}); + EventEmitter.errorMonitor = kErrorMonitor; // The default for captureRejections is false diff --git a/test/parallel/test-eventemitter-asyncresource.js b/test/parallel/test-eventemitter-asyncresource.js new file mode 100644 index 00000000000000..ae86f3608b7ffd --- /dev/null +++ b/test/parallel/test-eventemitter-asyncresource.js @@ -0,0 +1,132 @@ +'use strict'; + +const common = require('../common'); +const { EventEmitterAsyncResource } = require('events'); +const { + createHook, + executionAsyncId, +} = require('async_hooks'); + +const { + deepStrictEqual, + strictEqual, +} = require('assert'); + +const { + setImmediate: tick, +} = require('timers/promises'); + +function makeHook(trackedTypes) { + const eventMap = new Map(); + + function log(asyncId, name) { + const entry = eventMap.get(asyncId); + if (entry !== undefined) entry.push({ name }); + } + + const hook = createHook({ + init(asyncId, type, triggerAsyncId, resource) { + if (trackedTypes.includes(type)) { + eventMap.set(asyncId, [ + { + name: 'init', + type, + triggerAsyncId, + resource, + }, + ]); + } + }, + + before(asyncId) { log(asyncId, 'before'); }, + after(asyncId) { log(asyncId, 'after'); }, + destroy(asyncId) { log(asyncId, 'destroy'); } + }).enable(); + + return { + done() { + hook.disable(); + return new Set(eventMap.values()); + }, + ids() { + return new Set(eventMap.keys()); + } + }; +} + +// Tracks emit() calls correctly using async_hooks +(async () => { + const tracer = makeHook(['Foo']); + + class Foo extends EventEmitterAsyncResource {} + + const origExecutionAsyncId = executionAsyncId(); + const foo = new Foo(); + + foo.on('someEvent', common.mustCall()); + foo.emit('someEvent'); + + deepStrictEqual([foo.asyncId], [...tracer.ids()]); + strictEqual(foo.triggerAsyncId, origExecutionAsyncId); + strictEqual(foo.asyncResource.eventEmitter, foo); + + foo.emitDestroy(); + + await tick(); + + deepStrictEqual(tracer.done(), new Set([ + [ + { + name: 'init', + type: 'Foo', + triggerAsyncId: origExecutionAsyncId, + resource: foo.asyncResource, + }, + { name: 'before' }, + { name: 'after' }, + { name: 'destroy' }, + ], + ])); +})().then(common.mustCall()); + +// Can explicitly specify name as positional arg +(async () => { + const tracer = makeHook(['ResourceName']); + + const origExecutionAsyncId = executionAsyncId(); + class Foo extends EventEmitterAsyncResource {} + + const foo = new Foo('ResourceName'); + + deepStrictEqual(tracer.done(), new Set([ + [ + { + name: 'init', + type: 'ResourceName', + triggerAsyncId: origExecutionAsyncId, + resource: foo.asyncResource, + }, + ], + ])); +})().then(common.mustCall()); + +// Can explicitly specify name as option +(async () => { + const tracer = makeHook(['ResourceName']); + + const origExecutionAsyncId = executionAsyncId(); + class Foo extends EventEmitterAsyncResource {} + + const foo = new Foo({ name: 'ResourceName' }); + + deepStrictEqual(tracer.done(), new Set([ + [ + { + name: 'init', + type: 'ResourceName', + triggerAsyncId: origExecutionAsyncId, + resource: foo.asyncResource, + }, + ], + ])); +})().then(common.mustCall());