Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: add EventEmitterAsyncResource to core #41246

Merged
merged 1 commit into from Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 85 additions & 0 deletions doc/api/events.md
Expand Up @@ -1166,6 +1166,89 @@ const emitter = new EventEmitter();
setMaxListeners(5, target, emitter);
```

## Class: `events.EventEmitterAsyncResource extends EventEmitter`
jasnell marked this conversation as resolved.
Show resolved Hide resolved

<!-- YAML
added: REPLACEME
-->

Integrates `EventEmitter` with {AsyncResource} for `EventEmitter`s that
Flarna marked this conversation as resolved.
Show resolved Hide resolved
require manual async tracking. Specifically, all events emitted by instances
of `events.EventEmitterAsyncResource` will run within its [async context][].

```js
const { EventEmitterAsyncResource } = require('events');
const { notStrictEqual, strictEqual } = require('assert');
const { executionAsyncId } = require('async_hooks');

// Async tracking tooling will identify this as 'Q'.
const ee1 = new EventEmitterAsyncResource({ name: 'Q' });

// 'foo' listeners will run in the EventEmitters async context.
ee1.on('foo', () => {
strictEqual(executionAsyncId(), ee1.asyncId);
strictEqual(triggerAsyncId(), ee1.triggerAsyncId);
});

const ee2 = new EventEmitter();

// 'foo' listeners on ordinary EventEmitters that do not track async
// context, however, run in the same async context as the emit().
ee2.on('foo', () => {
notStrictEqual(executionAsyncId(), ee2.asyncId);
notStrictEqual(triggerAsyncId(), ee2.triggerAsyncId);
});

Promise.resolve().then(() => {
ee1.emit('foo');
ee2.emit('foo');
});
```

The `EventEmitterAsyncResource` class has the same methods and takes the
same options as `EventEmitter` and `AsyncResource` themselves.

### `new events.EventEmitterAsyncResource(options)`

* `options` {Object}
* `captureRejections` {boolean} It enables
[automatic capturing of promise rejection][capturerejections].
**Default:** `false`.
* `name` {string} The type of async event. **Default::**
[`new.target.name`][].
* `triggerAsyncId` {number} The ID of the execution context that created this
async event. **Default:** `executionAsyncId()`.
* `requireManualDestroy` {boolean} If set to `true`, disables `emitDestroy`
when the object is garbage collected. This usually does not need to be set
(even if `emitDestroy` is called manually), unless the resource's `asyncId`
is retrieved and the sensitive API's `emitDestroy` is called with it.
When set to `false`, the `emitDestroy` call on garbage collection
will only take place if there is at least one active `destroy` hook.
**Default:** `false`.

### `eventemitterasyncresource.asyncId`

* Type: {number} The unique `asyncId` assigned to the resource.

### `eventemitterasyncresource.asyncResource`

* Type: The underlying {AsyncResource}.

The returned `AsyncResource` object has an additional `eventEmitter` property
that provides a reference to this `EventEmitterAsyncResource`.

### `eventemitterasyncresource.emitDestroy()`

Call all `destroy` hooks. This should only ever be called once. An error will
be thrown if it is called more than once. This **must** be manually called. If
the resource is left to be collected by the GC then the `destroy` hooks will
never be called.

### `eventemitterasyncresource.triggerAsyncId`

* Type: {number} The same `triggerAsyncId` that is passed to the
`AsyncResource` constructor.

<a id="event-target-and-event-api"></a>

## `EventTarget` and `Event` API
Expand Down Expand Up @@ -1706,7 +1789,9 @@ to the `EventTarget`.
[`events.defaultMaxListeners`]: #eventsdefaultmaxlisteners
[`fs.ReadStream`]: fs.md#class-fsreadstream
[`net.Server`]: net.md#class-netserver
[`new.target.name`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/new.target
[`process.on('warning')`]: process.md#event-warning
[async context]: async_context.md
[capturerejections]: #capture-rejections-of-promises
[error]: #error-events
[rejection]: #emittersymbolfornodejsrejectionerr-eventname-args
Expand Down
130 changes: 130 additions & 0 deletions lib/events.js
Expand Up @@ -27,6 +27,7 @@ const {
ArrayPrototypeShift,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ArrayPrototypeUnshift,
Boolean,
Error,
ErrorCaptureStackTrace,
Expand All @@ -42,6 +43,7 @@ const {
Promise,
PromiseReject,
PromiseResolve,
ReflectApply,
ReflectOwnKeys,
String,
StringPrototypeSplit,
Expand All @@ -59,6 +61,7 @@ const {
kEnhanceStackBeforeInspector,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_THIS,
ERR_OUT_OF_RANGE,
ERR_UNHANDLED_ERROR
},
Expand All @@ -68,6 +71,7 @@ const {
validateAbortSignal,
validateBoolean,
validateFunction,
validateString,
} = require('internal/validators');

const kCapture = Symbol('kCapture');
Expand All @@ -76,6 +80,125 @@ const kMaxEventTargetListeners = Symbol('events.maxEventTargetListeners');
const kMaxEventTargetListenersWarned =
Symbol('events.maxEventTargetListenersWarned');

let EventEmitterAsyncResource;
// The EventEmitterAsyncResource has to be initialized lazily because event.js
// is loaded so early in the bootstrap process, before async_hooks is available.
//
// This implementation was adapted straight from addaleax's
// eventemitter-asyncresource MIT-licensed userland module.
// https://github.com/addaleax/eventemitter-asyncresource
function lazyEventEmitterAsyncResource() {
if (EventEmitterAsyncResource === undefined) {
const {
AsyncResource
} = require('async_hooks');

const kEventEmitter = Symbol('kEventEmitter');
const kAsyncResource = Symbol('kAsyncResource');
class EventEmitterReferencingAsyncResource extends AsyncResource {
/**
* @param {EventEmitter} ee
* @param {string} [type]
* @param {{
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(ee, type, options) {
super(type, options);
this[kEventEmitter] = ee;
}

/**
* @type {EventEmitter}
*/
get eventEmitter() {
if (this[kEventEmitter] === undefined)
throw new ERR_INVALID_THIS('EventEmitterReferencingAsyncResource');
return this[kEventEmitter];
}
}

EventEmitterAsyncResource =
class EventEmitterAsyncResource extends EventEmitter {
/**
* @param {{
* name?: string,
* triggerAsyncId?: number,
* requireManualDestroy?: boolean,
* }} [options]
*/
constructor(options = undefined) {
Flarna marked this conversation as resolved.
Show resolved Hide resolved
let name;
if (typeof options === 'string') {
name = options;
options = undefined;
} else {
if (new.target === EventEmitterAsyncResource) {
validateString(options?.name, 'options.name');
}
name = options?.name || new.target.name;
Flarna marked this conversation as resolved.
Show resolved Hide resolved
}
super(options);

this[kAsyncResource] =
new EventEmitterReferencingAsyncResource(this, name, options);
}

/**
* @param {symbol,string} event
* @param {...any} args
* @returns {boolean}
*/
emit(event, ...args) {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
const { asyncResource } = this;
ArrayPrototypeUnshift(args, super.emit, this, event);
return ReflectApply(asyncResource.runInAsyncScope, asyncResource,
args);
}

/**
* @returns {void}
*/
emitDestroy() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
this.asyncResource.emitDestroy();
}

/**
* @type {number}
*/
get asyncId() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this.asyncResource.asyncId();
}

/**
* @type {number}
*/
get triggerAsyncId() {
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this.asyncResource.triggerAsyncId();
}

/**
* @type {EventEmitterReferencingAsyncResource}
*/
get asyncResource() {
jasnell marked this conversation as resolved.
Show resolved Hide resolved
if (this[kAsyncResource] === undefined)
throw new ERR_INVALID_THIS('EventEmitterAsyncResource');
return this[kAsyncResource];
}
};
}
return EventEmitterAsyncResource;
}

/**
* Creates a new `EventEmitter` instance.
* @param {{ captureRejections?: boolean; }} [opts]
Expand Down Expand Up @@ -106,6 +229,13 @@ ObjectDefineProperty(EventEmitter, 'captureRejections', {
enumerable: true
});

ObjectDefineProperty(EventEmitter, 'EventEmitterAsyncResource', {
enumerable: true,
get: lazyEventEmitterAsyncResource,
jasnell marked this conversation as resolved.
Show resolved Hide resolved
set: undefined,
configurable: true,
});

EventEmitter.errorMonitor = kErrorMonitor;

// The default for captureRejections is false
Expand Down