diff --git a/doc/api/events.md b/doc/api/events.md index a2f9fe05ef119e..0239e5abb5f28e 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -155,9 +155,66 @@ myEmitter.emit('error', new Error('whoops!')); // Prints: whoops! there was an error ``` +## Capture Rejections of Promises + +> Stability: 1 - captureRejections is experimental. + +Using `async` functions with event handlers is problematic, because it +can lead to an unhandled rejection in case of a thrown exception: + +```js +const ee = new EventEmitter(); +ee.on('something', async (value) => { + throw new Error('kaboom'); +}); +``` + +The `captureRejections` option in the `EventEmitter` constructor or the global +setting change this behavior, installing a `.then(undefined, handler)` +handler on the `Promise`. This handler routes the exception +asynchronously to the [`Symbol.for('nodejs.rejection')`][rejection] method +if there is one, or to [`'error'`][error] event handler if there is none. + +```js +const ee1 = new EventEmitter({ captureRejections: true }); +ee1.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee1.on('error', console.log); + +const ee2 = new EventEmitter({ captureRejections: true }); +ee2.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee2[Symbol.for('nodejs.rejection')] = console.log; +``` + +Setting `EventEmitter.captureRejections = true` will change the default for all +new instances of `EventEmitter`. + +```js +EventEmitter.captureRejections = true; +const ee1 = new EventEmitter(); +ee1.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee1.on('error', console.log); +``` + +The `'error'` events that are generated by the `captureRejections` behavior +do not have a catch handler to avoid infinite error loops: the +recommendation is to **not use `async` functions as `'error'` event handlers**. + ## Class: `EventEmitter` The `EventEmitter` class is defined and exposed by the `events` module: @@ -169,6 +226,12 @@ const EventEmitter = require('events'); All `EventEmitter`s emit the event `'newListener'` when new listeners are added and `'removeListener'` when existing listeners are removed. +It supports the following option: + +* `captureRejections` {boolean} It enables + [automatic capturing of promise rejection][capturerejections]. + Default: `false`. + ### Event: 'newListener' + +> Stability: 1 - captureRejections is experimental. + +* `err` Error +* `eventName` {string|symbol} +* `...args` {any} + +The `Symbol.for('nodejs.rejection')` method is called in case a +promise rejection happens when emitting an event and +[`captureRejections`][capturerejections] is enabled on the emitter. +It is possible to use [`events.captureRejectionSymbol`][rejectionsymbol] in +place of `Symbol.for('nodejs.rejection')`. + +```js +const { EventEmitter, captureRejectionSymbol } = require('events'); + +class MyClass extends EventEmitter { + constructor() { + super({ captureRejections: true }); + } + + [captureRejectionSymbol](err, event, ...args) { + console.log('rejection happened for', event, 'with', err, ...args); + this.destroy(err); + } + + destroy(err) { + // Tear the resource down here. + } +} +``` + ## `events.once(emitter, name)` + +> Stability: 1 - captureRejections is experimental. + +Value: {boolean} + +Change the default `captureRejections` option on all new `EventEmitter` objects. + +## events.captureRejectionSymbol + + +> Stability: 1 - captureRejections is experimental. + +Value: `Symbol.for('nodejs.rejection')` + +See how to write a custom [rejection handler][rejection]. + [WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget [`--trace-warnings`]: cli.html#cli_trace_warnings [`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners @@ -751,3 +872,7 @@ run(); [`net.Server`]: net.html#net_class_net_server [`process.on('warning')`]: process.html#process_event_warning [stream]: stream.html +[capturerejections]: #events_capture_rejections_of_promises +[rejection]: #events_emitter_symbol_for_nodejs_rejection_err_eventname_args +[rejectionsymbol]: #events_events_capturerejectionsymbol +[error]: #events_error_events diff --git a/lib/events.js b/lib/events.js index 8cd42371bdeee2..1e4d17beced571 100644 --- a/lib/events.js +++ b/lib/events.js @@ -23,6 +23,7 @@ const { Array, + Boolean, Error, MathMin, NumberIsNaN, @@ -33,7 +34,10 @@ const { Promise, ReflectApply, ReflectOwnKeys, + Symbol, + SymbolFor, } = primordials; +const kRejection = SymbolFor('nodejs.rejection'); let spliceOne; @@ -51,8 +55,10 @@ const { inspect } = require('internal/util/inspect'); -function EventEmitter() { - EventEmitter.init.call(this); +const kCapture = Symbol('kCapture'); + +function EventEmitter(opts) { + EventEmitter.init.call(this, opts); } module.exports = EventEmitter; module.exports.once = once; @@ -62,6 +68,29 @@ EventEmitter.EventEmitter = EventEmitter; EventEmitter.usingDomains = false; +EventEmitter.captureRejectionSymbol = kRejection; +ObjectDefineProperty(EventEmitter, 'captureRejections', { + get() { + return EventEmitter.prototype[kCapture]; + }, + set(value) { + if (typeof value !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE('EventEmitter.captureRejections', + 'boolean', value); + } + + EventEmitter.prototype[kCapture] = value; + }, + enumerable: true +}); + +// The default for captureRejections is false +ObjectDefineProperty(EventEmitter.prototype, kCapture, { + value: false, + writable: true, + enumerable: false +}); + EventEmitter.prototype._events = undefined; EventEmitter.prototype._eventsCount = 0; EventEmitter.prototype._maxListeners = undefined; @@ -91,7 +120,7 @@ ObjectDefineProperty(EventEmitter, 'defaultMaxListeners', { } }); -EventEmitter.init = function() { +EventEmitter.init = function(opts) { if (this._events === undefined || this._events === ObjectGetPrototypeOf(this)._events) { @@ -100,8 +129,64 @@ EventEmitter.init = function() { } this._maxListeners = this._maxListeners || undefined; + + + if (opts && opts.captureRejections) { + if (typeof opts.captureRejections !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE('options.captureRejections', + 'boolean', opts.captureRejections); + } + this[kCapture] = Boolean(opts.captureRejections); + } else { + // Assigning it directly a prototype lookup, as it slighly expensive + // and it sits in a very sensitive hot path. + this[kCapture] = EventEmitter.prototype[kCapture]; + } }; +function addCatch(that, promise, type, args) { + if (!that[kCapture]) { + return; + } + + // Handle Promises/A+ spec, then could be a getter + // that throws on second use. + try { + const then = promise.then; + + if (typeof then === 'function') { + then.call(promise, undefined, function(err) { + // The callback is called with nextTick to avoid a follow-up + // rejection from this promise. + process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); + }); + } + } catch (err) { + that.emit('error', err); + } +} + +function emitUnhandledRejectionOrErr(ee, err, type, args) { + if (typeof ee[kRejection] === 'function') { + ee[kRejection](err, type, ...args); + } else { + // We have to disable the capture rejections mechanism, otherwise + // we might end up in an infinite loop. + const prev = ee[kCapture]; + + // If the error handler throws, it is not catcheable and it + // will end up in 'uncaughtException'. We restore the previous + // value of kCapture in case the uncaughtException is present + // and the exception is handled. + try { + ee[kCapture] = false; + ee.emit('error', err); + } finally { + ee[kCapture] = prev; + } + } +} + // Obviously not all Emitters should be limited to 10. This function allows // that to be increased. Set to zero for unlimited. EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { @@ -218,12 +303,29 @@ EventEmitter.prototype.emit = function emit(type, ...args) { return false; if (typeof handler === 'function') { - ReflectApply(handler, this, args); + const result = ReflectApply(handler, this, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty + if (result !== undefined && result !== null) { + addCatch(this, result, type, args); + } } else { const len = handler.length; const listeners = arrayClone(handler, len); - for (let i = 0; i < len; ++i) - ReflectApply(listeners[i], this, args); + for (var i = 0; i < len; ++i) { + const result = ReflectApply(listeners[i], this, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty. + // This code is duplicated because extracting it away + // would make it non-inlineable. + if (result !== undefined && result !== null) { + addCatch(this, result, type, args); + } + } } return true; diff --git a/test/parallel/test-event-capture-rejections.js b/test/parallel/test-event-capture-rejections.js new file mode 100644 index 00000000000000..83da3184a67116 --- /dev/null +++ b/test/parallel/test-event-capture-rejections.js @@ -0,0 +1,297 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { EventEmitter, captureRejectionSymbol } = require('events'); +const { inherits } = require('util'); + +// Inherits from EE without a call to the +// parent constructor. +function NoConstructor() { +} + +inherits(NoConstructor, EventEmitter); + +function captureRejections() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(captureRejectionsTwoHandlers); + })); + + ee.emit('something'); +} + +function captureRejectionsTwoHandlers() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + // throw twice + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + let count = 0; + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + if (++count === 2) { + process.nextTick(defaultValue); + } + }, 2)); + + ee.emit('something'); +} + +function defaultValue() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err); + process.nextTick(globalSetting); + })); + + ee.emit('something'); +} + +function globalSetting() { + assert.strictEqual(EventEmitter.captureRejections, false); + EventEmitter.captureRejections = true; + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + // restore default + EventEmitter.captureRejections = false; + process.nextTick(configurable); + })); + + ee.emit('something'); +} + +// We need to be able to configure this for streams, as we would +// like to call destro(err) there. +function configurable() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (...args) => { + assert.deepStrictEqual(args, [42, 'foobar']); + throw _err; + })); + + assert.strictEqual(captureRejectionSymbol, Symbol.for('nodejs.rejection')); + + ee[captureRejectionSymbol] = common.mustCall((err, type, ...args) => { + assert.strictEqual(err, _err); + assert.strictEqual(type, 'something'); + assert.deepStrictEqual(args, [42, 'foobar']); + process.nextTick(globalSettingNoConstructor); + }); + + ee.emit('something', 42, 'foobar'); +} + +function globalSettingNoConstructor() { + assert.strictEqual(EventEmitter.captureRejections, false); + EventEmitter.captureRejections = true; + const ee = new NoConstructor(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + // restore default + EventEmitter.captureRejections = false; + process.nextTick(thenable); + })); + + ee.emit('something'); +} + +function thenable() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall((value) => { + const obj = {}; + + Object.defineProperty(obj, 'then', { + get: common.mustCall(() => { + return common.mustCall((resolved, rejected) => { + assert.strictEqual(resolved, undefined); + rejected(_err); + }); + }, 1) // Only 1 call for Promises/A+ compat. + }); + + return obj; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(avoidLoopOnRejection); + })); + + ee.emit('something'); +} + +function avoidLoopOnRejection() { + const ee = new EventEmitter({ captureRejections: true }); + const _err1 = new Error('kaboom'); + const _err2 = new Error('kaboom2'); + ee.on('something', common.mustCall(async (value) => { + throw _err1; + })); + + ee[captureRejectionSymbol] = common.mustCall(async (err) => { + assert.strictEqual(err, _err1); + throw _err2; + }); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err2); + process.nextTick(avoidLoopOnError); + })); + + ee.emit('something'); +} + +function avoidLoopOnError() { + const ee = new EventEmitter({ captureRejections: true }); + const _err1 = new Error('kaboom'); + const _err2 = new Error('kaboom2'); + ee.on('something', common.mustCall(async (value) => { + throw _err1; + })); + + ee.on('error', common.mustCall(async (err) => { + assert.strictEqual(err, _err1); + throw _err2; + })); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err2); + process.nextTick(thenableThatThrows); + })); + + ee.emit('something'); +} + +function thenableThatThrows() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall((value) => { + const obj = {}; + + Object.defineProperty(obj, 'then', { + get: common.mustCall(() => { + throw _err; + }, 1) // Only 1 call for Promises/A+ compat. + }); + + return obj; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(resetCaptureOnThrowInError); + })); + + ee.emit('something'); +} + +function resetCaptureOnThrowInError() { + const ee = new EventEmitter({ captureRejections: true }); + ee.on('something', common.mustCall(async (value) => { + throw new Error('kaboom'); + })); + + ee.once('error', common.mustCall((err) => { + throw err; + })); + + process.removeAllListeners('uncaughtException'); + + process.once('uncaughtException', common.mustCall((err) => { + process.nextTick(next); + })); + + ee.emit('something'); + + function next() { + process.on('uncaughtException', common.mustNotCall()); + + const _err = new Error('kaboom2'); + ee.on('something2', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + process.removeAllListeners('uncaughtException'); + + // restore default + process.on('uncaughtException', (err) => { throw err; }); + + process.nextTick(argValidation); + })); + + ee.emit('something2'); + } +} + +function argValidation() { + + function testType(obj) { + common.expectsError(() => new EventEmitter({ captureRejections: obj }), { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError + }); + + common.expectsError(() => EventEmitter.captureRejections = obj, { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError + }); + } + + testType([]); + testType({ hello: 42 }); + testType(42); +} + +captureRejections();