From 4d51ce200dc852149646705790e61d9d9e177711 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 10:09:21 +0100 Subject: [PATCH 1/7] stream: make stream.destroy with callback API public This makes the `stream.destroy(err, callback)` API public. Additionally it makes some changes for easier use: - The callback is always invoked with the same behavior as eos. - The error is assumed to be handled and uncaughException is supressed. - The callback timing is the same regardless whether destroy has already been called or not. - The callback is always invoked asynchronously. - The callback used be invoked before emitting 'error' and/or 'close'. --- doc/api/stream.md | 7 ++++- lib/internal/streams/destroy.js | 26 ++++++++++--------- test/parallel/test-stream-writable-destroy.js | 24 +++++++++++++++++ 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 7bc1874c803cae..8317cdab02652a 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -369,7 +369,7 @@ to be processed. However, use of `writable.cork()` without implementing See also: [`writable.uncork()`][], [`writable._writev()`][stream-_writev]. -##### `writable.destroy([error])` +##### `writable.destroy([error, callback])` @@ -389,6 +389,11 @@ the `'drain'` event before destroying the stream. Once `destroy()` has been called any further calls will be a noop and no further errors except from `_destroy` may be emitted as `'error'`. +If passed `callback`; it will be invoked once the stream destrution has +completed. If an error has occured it will be passed as the first argument to +the callback and no `uncaughtException` error will occur even if no `'error'` +listener has been registered on the stream. + Implementors should not override this method, but instead implement [`writable._destroy()`][writable-_destroy]. diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a953afd445649..0d34621b08af23 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,18 +1,24 @@ 'use strict'; -// Undocumented cb() API, needed for core, not for public API. -// The cb() will be invoked synchronously if _destroy is synchronous. -// If cb is passed no 'error' event will be emitted. +let eos; + function destroy(err, cb) { const r = this._readableState; const w = this._writableState; - if ((w && w.destroyed) || (r && r.destroyed)) { - if (typeof cb === 'function') { - // TODO(ronag): Invoke with `'close'`/`'error'`. - cb(); - } + if (typeof err === 'function') { + cb = err; + err = null; + } + if (typeof cb === 'function') { + if (!eos) eos = require('internal/streams/end-of-stream'); + eos(this, (err) => { + cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined); + }); + } + + if ((w && w.destroyed) || (r && r.destroyed)) { return this; } @@ -52,10 +58,6 @@ function destroy(err, cb) { r.closed = true; } - if (typeof cb === 'function') { - cb(err); - } - if (err) { process.nextTick(emitErrorCloseNT, this, err); } else { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 706847a8582d0c..c4a2d62261ae8e 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -248,9 +248,33 @@ const assert = require('assert'); const expected = new Error('kaboom'); + let ticked = false; + write.destroy(expected, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ticked, true); + let ticked2 = false; + write.destroy(expected, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ticked2, true); + })); + ticked2 = true; + })); + ticked = true; + + // Destroy already destroyed. + + ticked = false; write.destroy(expected, common.mustCall((err) => { assert.strictEqual(err, undefined); + assert.strictEqual(ticked, true); + let ticked2 = false; + write.destroy(expected, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ticked2, true); + })); + ticked2 = true; })); + ticked = true; } { From 54f62fcff0d3b7c65beb560f68aaaf62f5142a7c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 10:42:00 +0100 Subject: [PATCH 2/7] fixup: docs --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 8317cdab02652a..c6d93c1e7e1a29 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2941,6 +2941,6 @@ contain multi-byte characters. [stream-write]: #stream_writable_write_chunk_encoding_callback [Stream Three States]: #stream_three_states [writable-_destroy]: #stream_writable_destroy_err_callback -[writable-destroy]: #stream_writable_destroy_error +[writable-destroy]: #stream_writable_destroy_error_callback [writable-new]: #stream_constructor_new_stream_writable_options [zlib]: zlib.html From c0203240817edf2a201b3aaf1c88ae87901b3805 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 10:43:38 +0100 Subject: [PATCH 3/7] fixup: docs --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index c6d93c1e7e1a29..517a807745c126 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -369,7 +369,7 @@ to be processed. However, use of `writable.cork()` without implementing See also: [`writable.uncork()`][], [`writable._writev()`][stream-_writev]. -##### `writable.destroy([error, callback])` +##### `writable.destroy([error][, callback])` From 80cf7f68d517a4bd4d54ba6a27c376a37892acd6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 11:08:48 +0100 Subject: [PATCH 4/7] fixup: tls test + writable write complete err --- lib/_stream_writable.js | 4 +++- test/parallel/test-tls-writewrap-leak.js | 11 ++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index af199956062d67..88f7fa8565a807 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -464,9 +464,11 @@ function afterWrite(stream, state, count, cb) { stream.emit('drain'); } + const err = state.destroyed ? new ERR_STREAM_DESTROYED('write') : undefined; + while (count-- > 0) { state.pendingcb--; - cb(); + cb(err); } if (state.destroyed) { diff --git a/test/parallel/test-tls-writewrap-leak.js b/test/parallel/test-tls-writewrap-leak.js index 7035764cba5394..4c0858c741703f 100644 --- a/test/parallel/test-tls-writewrap-leak.js +++ b/test/parallel/test-tls-writewrap-leak.js @@ -12,13 +12,14 @@ const server = net.createServer(common.mustCall((c) => { c.destroy(); })).listen(0, common.mustCall(() => { const c = tls.connect({ port: server.address().port }); - c.on('error', () => { - // Otherwise `.write()` callback won't be invoked. - c._undestroy(); - }); + + c.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ECONNRESET'); + server.close(); + })); c.write('hello', common.mustCall((err) => { - assert.strictEqual(err.code, 'ECANCELED'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); server.close(); })); })); From 43472c4b04a8950780c3683cf6c04333fa4df976 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 11:22:04 +0100 Subject: [PATCH 5/7] fixup: test --- lib/_stream_writable.js | 4 +--- test/parallel/test-tls-writewrap-leak.js | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 88f7fa8565a807..af199956062d67 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -464,11 +464,9 @@ function afterWrite(stream, state, count, cb) { stream.emit('drain'); } - const err = state.destroyed ? new ERR_STREAM_DESTROYED('write') : undefined; - while (count-- > 0) { state.pendingcb--; - cb(err); + cb(); } if (state.destroyed) { diff --git a/test/parallel/test-tls-writewrap-leak.js b/test/parallel/test-tls-writewrap-leak.js index 4c0858c741703f..26e3c81dd4e291 100644 --- a/test/parallel/test-tls-writewrap-leak.js +++ b/test/parallel/test-tls-writewrap-leak.js @@ -19,7 +19,8 @@ const server = net.createServer(common.mustCall((c) => { })); c.write('hello', common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); + // TODO + // assert.strictEqual(err.code, 'ECANCELED'); server.close(); })); })); From b40a103eab79f98a240ff615dc3066cae9fbe849 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 11:33:27 +0100 Subject: [PATCH 6/7] fixup: reduce changes --- lib/internal/streams/destroy.js | 23 ++++++++++++++++------- test/parallel/test-tls-writewrap-leak.js | 12 +++++------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 0d34621b08af23..f03336d380e0ba 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -11,14 +11,13 @@ function destroy(err, cb) { err = null; } - if (typeof cb === 'function') { - if (!eos) eos = require('internal/streams/end-of-stream'); - eos(this, (err) => { - cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined); - }); - } - if ((w && w.destroyed) || (r && r.destroyed)) { + if (typeof cb === 'function') { + if (!eos) eos = require('internal/streams/end-of-stream'); + eos(this, (err) => { + cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined); + }); + } return this; } @@ -58,6 +57,16 @@ function destroy(err, cb) { r.closed = true; } + // TODO(ronag): Remove this and always use eos + // in order to ensure the same order relative + // to events regardless whether this is the first + // call to destroy(cb) or not. + // Revisit once https://github.com/nodejs/node/pull/29179 + // is closed. + if (typeof cb === 'function') { + cb(err); + } + if (err) { process.nextTick(emitErrorCloseNT, this, err); } else { diff --git a/test/parallel/test-tls-writewrap-leak.js b/test/parallel/test-tls-writewrap-leak.js index 26e3c81dd4e291..7035764cba5394 100644 --- a/test/parallel/test-tls-writewrap-leak.js +++ b/test/parallel/test-tls-writewrap-leak.js @@ -12,15 +12,13 @@ const server = net.createServer(common.mustCall((c) => { c.destroy(); })).listen(0, common.mustCall(() => { const c = tls.connect({ port: server.address().port }); - - c.on('error', common.mustCall((err) => { - assert.strictEqual(err.code, 'ECONNRESET'); - server.close(); - })); + c.on('error', () => { + // Otherwise `.write()` callback won't be invoked. + c._undestroy(); + }); c.write('hello', common.mustCall((err) => { - // TODO - // assert.strictEqual(err.code, 'ECANCELED'); + assert.strictEqual(err.code, 'ECANCELED'); server.close(); })); })); From 81de706be6d4a9c0aa1d8804668e4107417b0aee Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 29 Feb 2020 11:55:58 +0100 Subject: [PATCH 7/7] fixup: always register eos --- lib/internal/streams/destroy.js | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index f03336d380e0ba..2cb077f5d81eef 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -1,5 +1,6 @@ 'use strict'; +const { once } = require('internal/util'); let eos; function destroy(err, cb) { @@ -11,13 +12,17 @@ function destroy(err, cb) { err = null; } + if (typeof cb === 'function') { + // TODO(ronag): Remove once cb is invoked only through eos. + cb = once(cb); + + if (!eos) eos = require('internal/streams/end-of-stream'); + eos(this, (err) => { + cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined); + }); + } + if ((w && w.destroyed) || (r && r.destroyed)) { - if (typeof cb === 'function') { - if (!eos) eos = require('internal/streams/end-of-stream'); - eos(this, (err) => { - cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined); - }); - } return this; }