From 200543bca39fad94dcf6cc5ec4fe9fbe34b3291f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Sun, 19 May 2019 13:55:18 +0200 Subject: [PATCH 1/7] events: add captureRejection option --- doc/api/events.md | 125 ++++++++ lib/events.js | 112 ++++++- .../parallel/test-event-capture-rejections.js | 297 ++++++++++++++++++ 3 files changed, 528 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-event-capture-rejections.js diff --git a/doc/api/events.md b/doc/api/events.md index ec6f608609073f..3a558ec233f06b 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -155,9 +155,66 @@ myEmitter.emit('error', new Error('whoops!')); // Prints: whoops! there was an error ``` +## Capture Rejections of Promises + +> Stability: 1 - captureRejections is experimental. + +Using `async` functions with event handlers is problematic, because it +can lead to an unhandled rejection in case of a thrown exception: + +```js +const ee = new EventEmitter(); +ee.on('something', async (value) => { + throw new Error('kaboom'); +}); +``` + +The `captureRejections` option in the `EventEmitter` constructor or the global +setting change this behavior, installing a `.then(undefined, handler)` +handler on the `Promise`. This handler routes the exception +asynchronously to the [`Symbol.for('nodejs.rejection')`][rejection] method +if there is one, or to [`'error'`][error] event handler if there is none. + +```js +const ee1 = new EventEmitter({ captureRejections: true }); +ee1.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee1.on('error', console.log); + +const ee2 = new EventEmitter({ captureRejections: true }); +ee2.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee2[Symbol.for('nodejs.rejection')] = console.log; +``` + +Setting `EventEmitter.captureRejections = true` will change the default for all +new instances of `EventEmitter`. + +```js +EventEmitter.captureRejections = true; +const ee1 = new EventEmitter(); +ee1.on('something', async (value) => { + throw new Error('kaboom'); +}); + +ee1.on('error', console.log); +``` + +The `'error'` events that are generated by the `captureRejections` behavior +do not have a catch handler to avoid infinite error loops: the +recommendation is to **not use `async` functions as `'error'` event handlers**. + ## Class: EventEmitter The `EventEmitter` class is defined and exposed by the `events` module: @@ -169,6 +226,12 @@ const EventEmitter = require('events'); All `EventEmitter`s emit the event `'newListener'` when new listeners are added and `'removeListener'` when existing listeners are removed. +It supports the following option: + +* `captureRejections` {boolean} It enables + [automatic capturing of promise rejection][capturerejections]. + Default: `false`. + ### Event: 'newListener' + +> Stability: 1 - captureRejections is experimental. + +* `err` Error +* `eventName` {string|symbol} +* `...args` {any} + +The `Symbol.for('nodejs.rejection')` method is called in case a +promise rejection happens when emitting an event and +[`captureRejections`][capturerejections] is enabled on the emitter. +It is possible to use [`events.captureRejectionSymbol`][rejectionsymbol] in +place of `Symbol.for('nodejs.rejection')`. + +```js +const { EventEmitter, captureRejectionSymbol } = require('events'); + +class MyClass extends EventEmitter { + constructor() { + super({ captureRejections: true }); + } + + [captureRejectionSymbol](err, event, ...args) { + console.log('rejection happened for', event, 'with', err, ...args); + this.destroy(err); + } + + destroy(err) { + // Tear the resource down here. + } +} +``` + ## events.once(emitter, name) + +> Stability: 1 - captureRejections is experimental. + +Value: {boolean} + +Change the default `captureRejections` option on all new `EventEmitter` objects. + +## events.captureRejectionSymbol + + +> Stability: 1 - captureRejections is experimental. + +Value: `Symbol.for('nodejs.rejection')` + +See how to write a custom [rejection handler][rejection]. + [WHATWG-EventTarget]: https://dom.spec.whatwg.org/#interface-eventtarget [`--trace-warnings`]: cli.html#cli_trace_warnings [`EventEmitter.defaultMaxListeners`]: #events_eventemitter_defaultmaxlisteners @@ -751,3 +872,7 @@ run(); [`net.Server`]: net.html#net_class_net_server [`process.on('warning')`]: process.html#process_event_warning [stream]: stream.html +[capturerejections]: #events_capture_rejections_of_promises +[rejection]: #events_emitter_symbol_for_nodejs_rejection_err_eventname_args +[rejectionsymbol]: #events_events_capturerejectionsymbol +[error]: #events_error_events diff --git a/lib/events.js b/lib/events.js index a09333af443534..ce93cd9f9bfd3e 100644 --- a/lib/events.js +++ b/lib/events.js @@ -23,6 +23,7 @@ const { Array, + Boolean, MathMin, NumberIsNaN, ObjectCreate, @@ -32,6 +33,7 @@ const { ReflectApply, ReflectOwnKeys, } = primordials; +const kRejection = Symbol.for('nodejs.rejection'); let spliceOne; @@ -49,8 +51,10 @@ const { inspect } = require('internal/util/inspect'); -function EventEmitter() { - EventEmitter.init.call(this); +const kCapture = Symbol('kCapture'); + +function EventEmitter(opts) { + EventEmitter.init.call(this, opts); } module.exports = EventEmitter; module.exports.once = once; @@ -60,6 +64,29 @@ EventEmitter.EventEmitter = EventEmitter; EventEmitter.usingDomains = false; +EventEmitter.captureRejectionSymbol = kRejection; +ObjectDefineProperty(EventEmitter, 'captureRejections', { + get() { + return EventEmitter.prototype[kCapture]; + }, + set(value) { + if (typeof value !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE('EventEmitter.captureRejections', + 'boolean', value); + } + + EventEmitter.prototype[kCapture] = value; + }, + enumerable: true +}); + +// The default for captureRejections is false +ObjectDefineProperty(EventEmitter.prototype, kCapture, { + value: false, + writable: true, + enumerable: false +}); + EventEmitter.prototype._events = undefined; EventEmitter.prototype._eventsCount = 0; EventEmitter.prototype._maxListeners = undefined; @@ -89,7 +116,7 @@ ObjectDefineProperty(EventEmitter, 'defaultMaxListeners', { } }); -EventEmitter.init = function() { +EventEmitter.init = function(opts) { if (this._events === undefined || this._events === ObjectGetPrototypeOf(this)._events) { @@ -98,8 +125,64 @@ EventEmitter.init = function() { } this._maxListeners = this._maxListeners || undefined; + + + if (opts && opts.captureRejections) { + if (typeof opts.captureRejections !== 'boolean') { + throw new ERR_INVALID_ARG_TYPE('options.captureRejections', + 'boolean', opts.captureRejections); + } + this[kCapture] = Boolean(opts.captureRejections); + } else { + // Assigning it directly a prototype lookup, as it slighly expensive + // and it sits in a very sensitive hot path. + this[kCapture] = EventEmitter.prototype[kCapture]; + } }; +function addCatch(that, promise, type, args) { + if (!that[kCapture]) { + return; + } + + // Handle Promises/A+ spec, then could be a getter + // that throws on second use. + try { + const then = promise.then; + + if (typeof then === 'function') { + then.call(promise, undefined, function(err) { + // The callback is called with nextTick to avoid a follow-up + // rejection from this promise. + process.nextTick(emitUnhandledRejectionOrErr, that, err, type, args); + }); + } + } catch (err) { + that.emit('error', err); + } +} + +function emitUnhandledRejectionOrErr(ee, err, type, args) { + if (typeof ee[kRejection] === 'function') { + ee[kRejection](err, type, ...args); + } else { + // We have to disable the capture rejections mechanism, otherwise + // we might end up in an infinite loop. + const prev = ee[kCapture]; + + // If the error handler throws, it is not catcheable and it + // will end up in 'uncaughtException'. We restore the previous + // value of kCapture in case the uncaughtException is present + // and the exception is handled. + try { + ee[kCapture] = false; + ee.emit('error', err); + } finally { + ee[kCapture] = prev; + } + } +} + // Obviously not all Emitters should be limited to 10. This function allows // that to be increased. Set to zero for unlimited. EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { @@ -216,12 +299,29 @@ EventEmitter.prototype.emit = function emit(type, ...args) { return false; if (typeof handler === 'function') { - ReflectApply(handler, this, args); + const result = ReflectApply(handler, this, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty + if (result !== undefined && result !== null) { + addCatch(this, result, type, args); + } } else { const len = handler.length; const listeners = arrayClone(handler, len); - for (let i = 0; i < len; ++i) - ReflectApply(listeners[i], this, args); + for (var i = 0; i < len; ++i) { + const result = ReflectApply(listeners[i], this, args); + + // We check if result is undefined first because that + // is the most common case so we do not pay any perf + // penalty. + // This code is duplicated because extracting it away + // would make it non-inlineable. + if (result !== undefined && result !== null) { + addCatch(this, result, type, args); + } + } } return true; diff --git a/test/parallel/test-event-capture-rejections.js b/test/parallel/test-event-capture-rejections.js new file mode 100644 index 00000000000000..83da3184a67116 --- /dev/null +++ b/test/parallel/test-event-capture-rejections.js @@ -0,0 +1,297 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { EventEmitter, captureRejectionSymbol } = require('events'); +const { inherits } = require('util'); + +// Inherits from EE without a call to the +// parent constructor. +function NoConstructor() { +} + +inherits(NoConstructor, EventEmitter); + +function captureRejections() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(captureRejectionsTwoHandlers); + })); + + ee.emit('something'); +} + +function captureRejectionsTwoHandlers() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + // throw twice + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + let count = 0; + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + if (++count === 2) { + process.nextTick(defaultValue); + } + }, 2)); + + ee.emit('something'); +} + +function defaultValue() { + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err); + process.nextTick(globalSetting); + })); + + ee.emit('something'); +} + +function globalSetting() { + assert.strictEqual(EventEmitter.captureRejections, false); + EventEmitter.captureRejections = true; + const ee = new EventEmitter(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + // restore default + EventEmitter.captureRejections = false; + process.nextTick(configurable); + })); + + ee.emit('something'); +} + +// We need to be able to configure this for streams, as we would +// like to call destro(err) there. +function configurable() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (...args) => { + assert.deepStrictEqual(args, [42, 'foobar']); + throw _err; + })); + + assert.strictEqual(captureRejectionSymbol, Symbol.for('nodejs.rejection')); + + ee[captureRejectionSymbol] = common.mustCall((err, type, ...args) => { + assert.strictEqual(err, _err); + assert.strictEqual(type, 'something'); + assert.deepStrictEqual(args, [42, 'foobar']); + process.nextTick(globalSettingNoConstructor); + }); + + ee.emit('something', 42, 'foobar'); +} + +function globalSettingNoConstructor() { + assert.strictEqual(EventEmitter.captureRejections, false); + EventEmitter.captureRejections = true; + const ee = new NoConstructor(); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + // restore default + EventEmitter.captureRejections = false; + process.nextTick(thenable); + })); + + ee.emit('something'); +} + +function thenable() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall((value) => { + const obj = {}; + + Object.defineProperty(obj, 'then', { + get: common.mustCall(() => { + return common.mustCall((resolved, rejected) => { + assert.strictEqual(resolved, undefined); + rejected(_err); + }); + }, 1) // Only 1 call for Promises/A+ compat. + }); + + return obj; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(avoidLoopOnRejection); + })); + + ee.emit('something'); +} + +function avoidLoopOnRejection() { + const ee = new EventEmitter({ captureRejections: true }); + const _err1 = new Error('kaboom'); + const _err2 = new Error('kaboom2'); + ee.on('something', common.mustCall(async (value) => { + throw _err1; + })); + + ee[captureRejectionSymbol] = common.mustCall(async (err) => { + assert.strictEqual(err, _err1); + throw _err2; + }); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err2); + process.nextTick(avoidLoopOnError); + })); + + ee.emit('something'); +} + +function avoidLoopOnError() { + const ee = new EventEmitter({ captureRejections: true }); + const _err1 = new Error('kaboom'); + const _err2 = new Error('kaboom2'); + ee.on('something', common.mustCall(async (value) => { + throw _err1; + })); + + ee.on('error', common.mustCall(async (err) => { + assert.strictEqual(err, _err1); + throw _err2; + })); + + process.removeAllListeners('unhandledRejection'); + + process.once('unhandledRejection', common.mustCall((err) => { + // restore default + process.on('unhandledRejection', (err) => { throw err; }); + + assert.strictEqual(err, _err2); + process.nextTick(thenableThatThrows); + })); + + ee.emit('something'); +} + +function thenableThatThrows() { + const ee = new EventEmitter({ captureRejections: true }); + const _err = new Error('kaboom'); + ee.on('something', common.mustCall((value) => { + const obj = {}; + + Object.defineProperty(obj, 'then', { + get: common.mustCall(() => { + throw _err; + }, 1) // Only 1 call for Promises/A+ compat. + }); + + return obj; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + process.nextTick(resetCaptureOnThrowInError); + })); + + ee.emit('something'); +} + +function resetCaptureOnThrowInError() { + const ee = new EventEmitter({ captureRejections: true }); + ee.on('something', common.mustCall(async (value) => { + throw new Error('kaboom'); + })); + + ee.once('error', common.mustCall((err) => { + throw err; + })); + + process.removeAllListeners('uncaughtException'); + + process.once('uncaughtException', common.mustCall((err) => { + process.nextTick(next); + })); + + ee.emit('something'); + + function next() { + process.on('uncaughtException', common.mustNotCall()); + + const _err = new Error('kaboom2'); + ee.on('something2', common.mustCall(async (value) => { + throw _err; + })); + + ee.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + + process.removeAllListeners('uncaughtException'); + + // restore default + process.on('uncaughtException', (err) => { throw err; }); + + process.nextTick(argValidation); + })); + + ee.emit('something2'); + } +} + +function argValidation() { + + function testType(obj) { + common.expectsError(() => new EventEmitter({ captureRejections: obj }), { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError + }); + + common.expectsError(() => EventEmitter.captureRejections = obj, { + code: 'ERR_INVALID_ARG_TYPE', + type: TypeError + }); + } + + testType([]); + testType({ hello: 42 }); + testType(42); +} + +captureRejections(); From 25b32e59fb74d41c081c2438be2c68b5dad21949 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Mon, 19 Aug 2019 17:45:30 +0200 Subject: [PATCH 2/7] stream: add support for captureRejection option --- lib/_stream_readable.js | 10 +++- lib/_stream_writable.js | 7 ++- lib/internal/streams/legacy.js | 4 +- test/parallel/test-stream-catch-rejections.js | 52 +++++++++++++++++++ 4 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-stream-catch-rejections.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 0d83ada205b835..8f5f66b391293c 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -190,7 +190,7 @@ function Readable(options) { this._destroy = options.destroy; } - Stream.call(this); + Stream.call(this, options); } ObjectDefineProperty(Readable.prototype, 'destroyed', { @@ -233,6 +233,14 @@ Readable.prototype._destroy = function(err, cb) { cb(err); }; +Readable.prototype[EE.captureRejectionSymbol] = function(err) { + // TODO(mcollina): remove the destroyed if once errorEmitted lands in + // Readable. + if (!this.destroyed) { + this.destroy(err); + } +}; + // Manually shove something into the read() buffer. // This returns true if the highWaterMark has not been hit yet, // similar to how Writable.write() returns true if you should diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index cf498e7d93cc6b..92b9ffb5f63ad8 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -35,6 +35,7 @@ module.exports = Writable; Writable.WritableState = WritableState; const internalUtil = require('internal/util'); +const EE = require('events'); const Stream = require('stream'); const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); @@ -250,7 +251,7 @@ function Writable(options) { this._final = options.final; } - Stream.call(this); + Stream.call(this, options); } // Otherwise people can pipe Writable streams, which is just wrong. @@ -808,3 +809,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { cb(err); }; + +Writable.prototype[EE.captureRejectionSymbol] = function(err) { + this.destroy(err); +}; diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js index 702e3c56ba6376..2bc7a86aa050b6 100644 --- a/lib/internal/streams/legacy.js +++ b/lib/internal/streams/legacy.js @@ -6,8 +6,8 @@ const { const EE = require('events'); -function Stream() { - EE.call(this); +function Stream(opts) { + EE.call(this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE); diff --git a/test/parallel/test-stream-catch-rejections.js b/test/parallel/test-stream-catch-rejections.js new file mode 100644 index 00000000000000..fb5f1fccc18bd2 --- /dev/null +++ b/test/parallel/test-stream-catch-rejections.js @@ -0,0 +1,52 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +{ + const r = new stream.Readable({ + captureRejections: true, + read() { + this.push('hello'); + this.push('world'); + this.push(null); + } + }); + + const err = new Error('kaboom'); + + r.on('error', common.mustCall((_err) => { + assert.strictEqual(err, _err); + assert.strictEqual(r.destroyed, true); + })); + + r.on('data', async () => { + throw err; + }); +} + +{ + const w = new stream.Writable({ + captureRejections: true, + highWaterMark: 1, + write(chunk, enc, cb) { + cb(); + } + }); + + const err = new Error('kaboom'); + + w.write('hello', () => { + w.write('world'); + }); + + w.on('error', common.mustCall((_err) => { + assert.strictEqual(err, _err); + assert.strictEqual(w.destroyed, true); + })); + + w.on('drain', common.mustCall(async () => { + throw err; + }, 2)); +} From 89ee802141d13fc68f63804be0301e25e8ae793e Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 3 Sep 2019 09:41:15 +0200 Subject: [PATCH 3/7] http: implement capture rejections for 'request' event --- lib/_http_server.js | 21 ++++ .../test-http-server-capture-rejections.js | 108 ++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 test/parallel/test-http-server-capture-rejections.js diff --git a/lib/_http_server.js b/lib/_http_server.js index 3d114d87112e1b..24a9c8594f5ddc 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -27,6 +27,7 @@ const { } = primordials; const net = require('net'); +const EE = require('events'); const assert = require('internal/assert'); const { parsers, @@ -351,6 +352,26 @@ Server.prototype.setTimeout = function setTimeout(msecs, callback) { return this; }; +Server.prototype[EE.captureRejectionSymbol] = function( + err, event, req, res) { + + switch (event) { + case 'request': + if (!res.headersSent && !res.writableEnded) { + // Don't leak headers. + for (const name of res.getHeaderNames()) { + res.removeHeader(name); + } + res.statusCode = 500; + res.end(STATUS_CODES[500]); + } else { + res.destroy(); + } + break; + default: + this.emit('error', err); + } +}; function connectionListener(socket) { defaultTriggerAsyncIdScope( diff --git a/test/parallel/test-http-server-capture-rejections.js b/test/parallel/test-http-server-capture-rejections.js new file mode 100644 index 00000000000000..b437e27b589dfc --- /dev/null +++ b/test/parallel/test-http-server-capture-rejections.js @@ -0,0 +1,108 @@ +'use strict'; + +const common = require('../common'); +const events = require('events'); +const { createServer, request } = require('http'); +const assert = require('assert'); + +events.captureRejections = true; + +{ + const server = createServer(common.mustCall(async (req, res) => { + // We will test that this header is cleaned up before forwarding. + res.setHeader('content-type', 'application/json'); + throw new Error('kaboom'); + })); + + server.listen(0, common.mustCall(() => { + const req = request({ + method: 'GET', + host: server.address().host, + port: server.address().port + }); + + req.end(); + + req.on('response', common.mustCall((res) => { + assert.strictEqual(res.statusCode, 500); + assert.strictEqual(res.headers.hasOwnProperty('content-type'), false); + let data = ''; + res.setEncoding('utf8'); + res.on('data', common.mustCall((chunk) => { + data += chunk; + })); + res.on('end', common.mustCall(() => { + assert.strictEqual(data, 'Internal Server Error'); + server.close(); + })); + })); + })); +} + +{ + let resolve; + const latch = new Promise((_resolve) => { + resolve = _resolve; + }); + const server = createServer(common.mustCall(async (req, res) => { + server.close(); + + // We will test that this header is cleaned up before forwarding. + res.setHeader('content-type', 'application/json'); + res.write('{'); + req.resume(); + + // Wait so the data is on the wire + await latch; + + throw new Error('kaboom'); + })); + + server.listen(0, common.mustCall(() => { + const req = request({ + method: 'GET', + host: server.address().host, + port: server.address().port + }); + + req.end(); + + req.on('response', common.mustCall((res) => { + assert.strictEqual(res.statusCode, 200); + assert.strictEqual(res.headers['content-type'], 'application/json'); + resolve(); + + let data = ''; + res.setEncoding('utf8'); + res.on('data', common.mustCall((chunk) => { + data += chunk; + })); + + req.on('close', common.mustCall(() => { + assert.strictEqual(data, '{'); + })); + })); + })); +} + +{ + const server = createServer(common.mustCall(async (req, res) => { + // We will test that this header is cleaned up before forwarding. + res.writeHead(200); + throw new Error('kaboom'); + })); + + server.listen(0, common.mustCall(() => { + const req = request({ + method: 'GET', + host: server.address().host, + port: server.address().port + }); + + req.end(); + req.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ECONNRESET'); + server.close(); + })); + })); +} From 8f6475626089993c962989bfe171f54470fa8b18 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Fri, 20 Sep 2019 18:19:35 +0200 Subject: [PATCH 4/7] http: add captureRejection support to OutgoingMessage --- lib/_http_outgoing.js | 6 ++ ...http-outgoing-message-capture-rejection.js | 89 +++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 test/parallel/test-http-outgoing-message-capture-rejection.js diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index e331d073b57067..c3bfbd09bfca0c 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -32,6 +32,7 @@ const { const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); +const EE = require('events'); const Stream = require('stream'); const internalUtil = require('internal/util'); const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); @@ -884,6 +885,11 @@ OutgoingMessage.prototype.pipe = function pipe() { this.emit('error', new ERR_STREAM_CANNOT_PIPE()); }; +OutgoingMessage.prototype[EE.captureRejectionSymbol] = +function(err, event) { + this.destroy(err); +}; + module.exports = { OutgoingMessage }; diff --git a/test/parallel/test-http-outgoing-message-capture-rejection.js b/test/parallel/test-http-outgoing-message-capture-rejection.js new file mode 100644 index 00000000000000..5f667ea17ea156 --- /dev/null +++ b/test/parallel/test-http-outgoing-message-capture-rejection.js @@ -0,0 +1,89 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const events = require('events'); +const { createServer, request } = require('http'); + +events.captureRejections = true; + +{ + const server = createServer(common.mustCall((req, res) => { + const _err = new Error('kaboom'); + res.on('drain', common.mustCall(async () => { + throw _err; + })); + + res.socket.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + + // Write until there is space in the buffer + while (res.write('hello')) {} + })); + + server.listen(0, common.mustCall(() => { + const req = request({ + method: 'GET', + host: server.address().host, + port: server.address().port + }); + + req.end(); + + req.on('response', common.mustCall((res) => { + res.on('aborted', common.mustCall()); + res.resume(); + server.close(); + })); + })); +} + +{ + let _res; + let shouldEnd = false; + // Not using mustCall here, because it is OS-dependant. + const server = createServer((req, res) => { + // So that we cleanly stop + _res = res; + + if (shouldEnd) { + res.end(); + } + }); + + server.listen(0, common.mustCall(() => { + const _err = new Error('kaboom'); + + const req = request({ + method: 'POST', + host: server.address().host, + port: server.address().port + }); + + req.on('response', common.mustNotCall((res) => { + // So that we cleanly stop + res.resume(); + server.close(); + })); + + req.on('error', common.mustCall((err) => { + server.close(); + // On some variants of Windows, this can happen before + // the server has received the request. + if (_res) { + _res.end(); + } else { + shouldEnd = true; + } + assert.strictEqual(err, _err); + })); + + req.on('drain', common.mustCall(async () => { + throw _err; + })); + + // Write until there is space in the buffer + while (req.write('hello')) {} + })); +} From 59388771e58d5b8f3be8cb1091565f56df380562 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 1 Oct 2019 17:30:39 +0200 Subject: [PATCH 5/7] net: implement capture rejections for 'connection' event --- lib/_http_server.js | 6 +++-- lib/net.js | 13 +++++++++ .../test-net-server-capture-rejection.js | 27 +++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 test/parallel/test-net-server-capture-rejection.js diff --git a/lib/_http_server.js b/lib/_http_server.js index 24a9c8594f5ddc..2daddd511cc8f4 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -353,10 +353,11 @@ Server.prototype.setTimeout = function setTimeout(msecs, callback) { }; Server.prototype[EE.captureRejectionSymbol] = function( - err, event, req, res) { + err, event, ...args) { switch (event) { case 'request': + const [ , res] = args; if (!res.headersSent && !res.writableEnded) { // Don't leak headers. for (const name of res.getHeaderNames()) { @@ -369,7 +370,8 @@ Server.prototype[EE.captureRejectionSymbol] = function( } break; default: - this.emit('error', err); + net.Server.prototype[Symbol.for('nodejs.rejection')] + .call(this, err, event, ...args); } }; diff --git a/lib/net.js b/lib/net.js index 02fd18748036a3..7e5d0ea621dedc 100644 --- a/lib/net.js +++ b/lib/net.js @@ -1653,6 +1653,19 @@ function emitCloseNT(self) { } +Server.prototype[EventEmitter.captureRejectionSymbol] = function( + err, event, sock) { + + switch (event) { + case 'connection': + sock.destroy(err); + break; + default: + this.emit('error', err); + } +}; + + // Legacy alias on the C++ wrapper object. This is not public API, so we may // want to runtime-deprecate it at some point. There's no hurry, though. ObjectDefineProperty(TCP.prototype, 'owner', { diff --git a/test/parallel/test-net-server-capture-rejection.js b/test/parallel/test-net-server-capture-rejection.js new file mode 100644 index 00000000000000..b1564ec26874af --- /dev/null +++ b/test/parallel/test-net-server-capture-rejection.js @@ -0,0 +1,27 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const events = require('events'); +const { createServer, connect } = require('net'); + +events.captureRejections = true; + +const server = createServer(common.mustCall(async (sock) => { + server.close(); + + const _err = new Error('kaboom'); + sock.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + throw _err; +})); + +server.listen(0, common.mustCall(() => { + const sock = connect( + server.address().port, + server.address().host + ); + + sock.on('close', common.mustCall()); +})); From f283ec6b19cc28821d9d2cb475a873a314968f55 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Tue, 1 Oct 2019 18:06:22 +0200 Subject: [PATCH 6/7] tls: implement capture rejections for 'secureConnection' event --- lib/_tls_wrap.js | 14 ++++++++ .../test-tls-server-capture-rejection.js | 34 +++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 test/parallel/test-tls-server-capture-rejection.js diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index 6f08f91c43dd5d..40122a82d0b9f6 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -37,6 +37,7 @@ assertCrypto(); const { setImmediate } = require('timers'); const assert = require('internal/assert'); const crypto = require('crypto'); +const EE = require('events'); const net = require('net'); const tls = require('tls'); const common = require('_tls_common'); @@ -1282,6 +1283,19 @@ Server.prototype.addContext = function(servername, context) { this._contexts.push([re, tls.createSecureContext(context).context]); }; +Server.prototype[EE.captureRejectionSymbol] = function( + err, event, sock) { + + switch (event) { + case 'secureConnection': + sock.destroy(err); + break; + default: + net.Server.prototype[Symbol.for('nodejs.rejection')] + .call(this, err, event, sock); + } +}; + function SNICallback(servername, callback) { const contexts = this.server._contexts; diff --git a/test/parallel/test-tls-server-capture-rejection.js b/test/parallel/test-tls-server-capture-rejection.js new file mode 100644 index 00000000000000..f9bd3320e101f0 --- /dev/null +++ b/test/parallel/test-tls-server-capture-rejection.js @@ -0,0 +1,34 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const assert = require('assert'); +const events = require('events'); +const fixtures = require('../common/fixtures'); +const { createServer, connect } = require('tls'); +const cert = fixtures.readKey('rsa_cert.crt'); +const key = fixtures.readKey('rsa_private.pem'); + +events.captureRejections = true; + +const server = createServer({ cert, key }, common.mustCall(async (sock) => { + server.close(); + + const _err = new Error('kaboom'); + sock.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + throw _err; +})); + +server.listen(0, common.mustCall(() => { + const sock = connect({ + port: server.address().port, + host: server.address().host, + rejectUnauthorized: false + }); + + sock.on('close', common.mustCall()); +})); From d1a78bea12ef4644d2b1e4d6f0d49ba2c738c5be Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 2 Oct 2019 13:02:37 +0200 Subject: [PATCH 7/7] http2: implement capture rection for 'request' and 'stream' events --- lib/internal/http2/core.js | 45 ++++++ test/parallel/test-http2-capture-rejection.js | 152 ++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 test/parallel/test-http2-capture-rejection.js diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index c4d12f9205151c..167044f8408fee 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1388,6 +1388,17 @@ class Http2Session extends EventEmitter { this[kMaybeDestroy](); } + [EventEmitter.captureRejectionSymbol](err, event, ...args) { + switch (event) { + case 'stream': + const [stream] = args; + stream.destroy(err); + break; + default: + this.destroy(err); + } + } + // Destroy the session if: // * error is not undefined/null // * session is closed and there are no more pending or open streams @@ -2875,6 +2886,40 @@ class Http2Server extends NETServer { } } +Http2Server.prototype[EventEmitter.captureRejectionSymbol] = function( + err, event, ...args) { + + switch (event) { + case 'stream': + // TODO(mcollina): we might want to match this with what we do on + // the compat side. + const [stream] = args; + if (stream.sentHeaders) { + stream.destroy(err); + } else { + stream.respond({ [HTTP2_HEADER_STATUS]: 500 }); + stream.end(); + } + break; + case 'request': + const [, res] = args; + if (!res.headersSent && !res.finished) { + // Don't leak headers. + for (const name of res.getHeaderNames()) { + res.removeHeader(name); + } + res.statusCode = 500; + res.end(http.STATUS_CODES[500]); + } else { + res.destroy(); + } + break; + default: + net.Server.prototype[EventEmitter.captureRejectionSymbol] + .call(this, err, event, ...args); + } +}; + function setupCompat(ev) { if (ev === 'request') { this.removeListener('newListener', setupCompat); diff --git a/test/parallel/test-http2-capture-rejection.js b/test/parallel/test-http2-capture-rejection.js new file mode 100644 index 00000000000000..58f43581eb6bd3 --- /dev/null +++ b/test/parallel/test-http2-capture-rejection.js @@ -0,0 +1,152 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); + +const assert = require('assert'); +const events = require('events'); +const { createServer, connect } = require('http2'); + +events.captureRejections = true; + +{ + // Test error thrown in the server 'stream' event, + // after a respond() + + const server = createServer(); + server.on('stream', common.mustCall(async (stream) => { + server.close(); + + stream.respond({ ':status': 200 }); + + const _err = new Error('kaboom'); + stream.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + throw _err; + })); + + server.listen(0, common.mustCall(() => { + const { port } = server.address(); + const session = connect(`http://localhost:${port}`); + + const req = session.request(); + + req.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_HTTP2_STREAM_ERROR'); + })); + + req.on('close', common.mustCall(() => { + session.close(); + })); + })); +} + +{ + // Test error thrown in the server 'stream' event, + // before a respond(). + + const server = createServer(); + server.on('stream', common.mustCall(async (stream) => { + server.close(); + + stream.on('error', common.mustNotCall()); + + throw new Error('kaboom'); + })); + + server.listen(0, common.mustCall(() => { + const { port } = server.address(); + const session = connect(`http://localhost:${port}`); + + const req = session.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 500); + })); + + req.on('close', common.mustCall(() => { + session.close(); + })); + })); +} + + +{ + // Test error thrown in 'request' event + + const server = createServer(common.mustCall(async (req, res) => { + server.close(); + res.setHeader('content-type', 'application/json'); + const _err = new Error('kaboom'); + throw _err; + })); + + server.listen(0, common.mustCall(() => { + const { port } = server.address(); + const session = connect(`http://localhost:${port}`); + + const req = session.request(); + + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 500); + assert.strictEqual(Object.hasOwnProperty.call(headers, 'content-type'), + false); + })); + + req.on('close', common.mustCall(() => { + session.close(); + })); + + req.resume(); + })); +} + +{ + // Test error thrown in the client 'stream' event + + const server = createServer(); + server.on('stream', common.mustCall(async (stream) => { + const { port } = server.address(); + + server.close(); + + stream.pushStream({ + ':scheme': 'http', + ':path': '/foobar', + ':authority': `localhost:${port}`, + }, common.mustCall((err, push) => { + push.respond({ + 'content-type': 'text/html', + ':status': 200 + }); + push.end('pushed by the server'); + + stream.end('test'); + })); + + stream.respond({ + ':status': 200 + }); + })); + + server.listen(0, common.mustCall(() => { + const { port } = server.address(); + const session = connect(`http://localhost:${port}`); + + const req = session.request(); + + session.on('stream', common.mustCall(async (stream) => { + session.close(); + + const _err = new Error('kaboom'); + stream.on('error', common.mustCall((err) => { + assert.strictEqual(err, _err); + })); + throw _err; + })); + + req.end(); + })); +}