Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: make stream.destroy with callback API public #32021

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions doc/api/stream.md
Expand Up @@ -369,7 +369,7 @@ to be processed. However, use of `writable.cork()` without implementing

See also: [`writable.uncork()`][], [`writable._writev()`][stream-_writev].

##### `writable.destroy([error])`
##### `writable.destroy([error][, callback])`
<!-- YAML
added: v8.0.0
-->
Expand All @@ -389,6 +389,11 @@ the `'drain'` event before destroying the stream.
Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

If passed `callback`; it will be invoked once the stream destrution has
completed. If an error has occured it will be passed as the first argument to
the callback and no `uncaughtException` error will occur even if no `'error'`
listener has been registered on the stream.

Implementors should not override this method,
but instead implement [`writable._destroy()`][writable-_destroy].

Expand Down Expand Up @@ -2936,6 +2941,6 @@ contain multi-byte characters.
[stream-write]: #stream_writable_write_chunk_encoding_callback
[Stream Three States]: #stream_three_states
[writable-_destroy]: #stream_writable_destroy_err_callback
[writable-destroy]: #stream_writable_destroy_error
[writable-destroy]: #stream_writable_destroy_error_callback
[writable-new]: #stream_constructor_new_stream_writable_options
[zlib]: zlib.html
4 changes: 3 additions & 1 deletion lib/_stream_writable.js
Expand Up @@ -464,9 +464,11 @@ function afterWrite(stream, state, count, cb) {
stream.emit('drain');
}

const err = state.destroyed ? new ERR_STREAM_DESTROYED('write') : undefined;
ronag marked this conversation as resolved.
Show resolved Hide resolved

while (count-- > 0) {
state.pendingcb--;
cb();
cb(err);
}

if (state.destroyed) {
Expand Down
26 changes: 14 additions & 12 deletions lib/internal/streams/destroy.js
@@ -1,18 +1,24 @@
'use strict';

// Undocumented cb() API, needed for core, not for public API.
// The cb() will be invoked synchronously if _destroy is synchronous.
// If cb is passed no 'error' event will be emitted.
let eos;

function destroy(err, cb) {
const r = this._readableState;
const w = this._writableState;

if ((w && w.destroyed) || (r && r.destroyed)) {
if (typeof cb === 'function') {
// TODO(ronag): Invoke with `'close'`/`'error'`.
cb();
}
if (typeof err === 'function') {
cb = err;
err = null;
}

if (typeof cb === 'function') {
if (!eos) eos = require('internal/streams/end-of-stream');
eos(this, (err) => {
cb(err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE' ? err : undefined);
});
}

if ((w && w.destroyed) || (r && r.destroyed)) {
return this;
}

Expand Down Expand Up @@ -52,10 +58,6 @@ function destroy(err, cb) {
r.closed = true;
}

if (typeof cb === 'function') {
cb(err);
}

if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
Expand Down
24 changes: 24 additions & 0 deletions test/parallel/test-stream-writable-destroy.js
Expand Up @@ -248,9 +248,33 @@ const assert = require('assert');

const expected = new Error('kaboom');

let ticked = false;
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ticked, true);
let ticked2 = false;
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ticked2, true);
}));
ticked2 = true;
}));
ticked = true;

// Destroy already destroyed.

ticked = false;
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ticked, true);
let ticked2 = false;
write.destroy(expected, common.mustCall((err) => {
assert.strictEqual(err, undefined);
assert.strictEqual(ticked2, true);
}));
ticked2 = true;
}));
ticked = true;
}

{
Expand Down
11 changes: 6 additions & 5 deletions test/parallel/test-tls-writewrap-leak.js
Expand Up @@ -12,13 +12,14 @@ const server = net.createServer(common.mustCall((c) => {
c.destroy();
})).listen(0, common.mustCall(() => {
const c = tls.connect({ port: server.address().port });
c.on('error', () => {
// Otherwise `.write()` callback won't be invoked.
c._undestroy();
});

c.on('error', common.mustCall((err) => {
assert.strictEqual(err.code, 'ECONNRESET');
server.close();
}));

c.write('hello', common.mustCall((err) => {
assert.strictEqual(err.code, 'ECANCELED');
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
server.close();
}));
}));