diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 564cdf0e82a986..2fdd0d721b63be 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -107,8 +107,10 @@ function Transform(options) { } function final(cb) { + let called = false; if (typeof this._flush === 'function' && !this.destroyed) { - this._flush((er, data) => { + const result = this._flush((er, data) => { + called = true; if (er) { if (cb) { cb(er); @@ -126,6 +128,33 @@ function final(cb) { cb(); } }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + (data) => { + if (called) + return; + if (data != null) + this.push(data); + this.push(null); + if (cb) + process.nextTick(cb); + }, + (err) => { + if (cb) { + process.nextTick(cb, err); + } else { + process.nextTick(() => this.destroy(err)); + } + }); + } + } catch (err) { + process.nextTick(() => this.destroy(err)); + } + } } else { this.push(null); if (cb) { @@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) { const wState = this._writableState; const length = rState.length; - this._transform(chunk, encoding, (err, val) => { + let called = false; + const result = this._transform(chunk, encoding, (err, val) => { + called = true; if (err) { callback(err); return; @@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) { this[kCallback] = callback; } }); + if (result !== undefined && result != null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + (val) => { + if (called) + return; + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || + length === rState.length || + rState.length < rState.highWaterMark || + rState.length === 0) { + process.nextTick(callback); + } else { + this[kCallback] = callback; + } + }, + (err) => { + process.nextTick(callback, err); + }); + } + } catch (err) { + process.nextTick(callback, err); + } + } }; Transform.prototype._read = function() { diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 1232742eb900ec..22aee3236c4ea7 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -647,7 +647,7 @@ function needFinish(state) { function callFinal(stream, state) { state.sync = true; state.pendingcb++; - stream._final((err) => { + const result = stream._final((err) => { state.pendingcb--; if (err) { for (const callback of state[kOnFinished].splice(0)) { @@ -664,6 +664,31 @@ function callFinal(stream, state) { process.nextTick(finish, stream, state); } }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + function() { + if (state.prefinished) + return; + state.prefinish = true; + process.nextTick(() => stream.emit('prefinish')); + state.pendingcb++; + process.nextTick(finish, stream, state); + }, + function(err) { + for (const callback of state[kOnFinished].splice(0)) { + process.nextTick(callback, err); + } + process.nextTick(errorOrDestroy, stream, err, state.sync); + }); + } + } catch (err) { + process.nextTick(errorOrDestroy, stream, err, state.sync); + } + } state.sync = false; } diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3bccd46b7e65da..510f7a40c8ddd6 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -59,10 +59,13 @@ function destroy(err, cb) { } function _destroy(self, err, cb) { - self._destroy(err || null, (err) => { + let called = false; + const result = self._destroy(err || null, (err) => { const r = self._readableState; const w = self._writableState; + called = true; + if (err) { // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 err.stack; @@ -92,6 +95,64 @@ function _destroy(self, err, cb) { process.nextTick(emitCloseNT, self); } }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + function() { + if (called) + return; + + const r = self._readableState; + const w = self._writableState; + + if (w) { + w.closed = true; + } + if (r) { + r.closed = true; + } + + if (typeof cb === 'function') { + process.nextTick(cb); + } + + process.nextTick(emitCloseNT, self); + }, + function(err) { + const r = self._readableState; + const w = self._writableState; + err.stack; + + called = true; + + if (w && !w.errored) { + w.errored = err; + } + if (r && !r.errored) { + r.errored = err; + } + + if (w) { + w.closed = true; + } + if (r) { + r.closed = true; + } + + if (typeof cb === 'function') { + process.nextTick(cb, err); + } + + process.nextTick(emitErrorCloseNT, self, err); + }); + } + } catch (err) { + process.nextTick(emitErrorNT, self, err); + } + } } function emitErrorCloseNT(self, err) { @@ -230,7 +291,7 @@ function constructNT(stream) { const s = w || r; let called = false; - stream._construct((err) => { + const result = stream._construct((err) => { if (r) { r.constructed = true; } @@ -252,6 +313,47 @@ function constructNT(stream) { process.nextTick(emitConstructNT, stream); } }); + if (result !== undefined && result !== null) { + try { + const then = result.then; + if (typeof then === 'function') { + then.call( + result, + function() { + // If the callback was invoked, do nothing further. + if (called) + return; + if (r) { + r.constructed = true; + } + if (w) { + w.constructed = true; + } + if (s.destroyed) { + process.nextTick(() => stream.emit(kDestroy)); + } else { + process.nextTick(emitConstructNT, stream); + } + }, + function(err) { + if (r) { + r.constructed = true; + } + if (w) { + w.constructed = true; + } + called = true; + if (s.destroyed) { + process.nextTick(() => stream.emit(kDestroy, err)); + } else { + process.nextTick(errorOrDestroy, stream, err); + } + }); + } + } catch (err) { + process.nextTick(emitErrorNT, stream, err); + } + } } function emitConstructNT(stream) { diff --git a/test/parallel/test-stream-construct-async-error.js b/test/parallel/test-stream-construct-async-error.js new file mode 100644 index 00000000000000..34e450c853a850 --- /dev/null +++ b/test/parallel/test-stream-construct-async-error.js @@ -0,0 +1,258 @@ +'use strict'; + +const common = require('../common'); +const { + Duplex, + Writable, + Transform, +} = require('stream'); +const { setTimeout } = require('timers/promises'); +const assert = require('assert'); + +{ + class Foo extends Duplex { + async _construct(cb) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + cb(); + throw new Error('boom'); + } + } + + const foo = new Foo(); + foo.on('error', common.expectsError({ + message: 'boom' + })); + foo.on('close', common.mustCall(() => { + assert(foo._writableState.constructed); + assert(foo._readableState.constructed); + })); +} + +{ + class Foo extends Duplex { + async _destroy(err, cb) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + throw new Error('boom'); + } + } + + const foo = new Foo(); + foo.destroy(); + foo.on('error', common.expectsError({ + message: 'boom' + })); + foo.on('close', common.mustCall(() => { + assert(foo.destroyed); + })); +} + +{ + class Foo extends Duplex { + async _destroy(err, cb) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + } + } + + const foo = new Foo(); + foo.destroy(); + foo.on('close', common.mustCall(() => { + assert(foo.destroyed); + })); +} + +{ + class Foo extends Duplex { + async _construct() { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + } + + _write = common.mustCall((chunk, encoding, cb) => { + cb(); + }) + + _read() {} + } + + const foo = new Foo(); + foo.write('test', common.mustCall()); +} + +{ + class Foo extends Duplex { + async _construct(callback) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + callback(); + } + + _write = common.mustCall((chunk, encoding, cb) => { + cb(); + }) + + _read() {} + } + + const foo = new Foo(); + foo.write('test', common.mustCall()); +} + +{ + class Foo extends Writable { + _write = common.mustCall((chunk, encoding, cb) => { + cb(); + }) + + async _final() { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('finish', common.mustCall()); +} + +{ + class Foo extends Writable { + _write = common.mustCall((chunk, encoding, cb) => { + cb(); + }) + + async _final(callback) { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + callback(); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('finish', common.mustCall()); +} + +{ + class Foo extends Writable { + _write = common.mustCall((chunk, encoding, cb) => { + cb(); + }) + + async _final() { + // eslint-disable-next-line no-restricted-syntax + await setTimeout(common.platformTimeout(1)); + throw new Error('boom'); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('error', common.expectsError({ + message: 'boom' + })); + foo.on('close', common.mustCall()); +} + +{ + const expected = ['hello', 'world']; + class Foo extends Transform { + async _flush() { + return 'world'; + } + + _transform(chunk, encoding, callback) { + callback(null, chunk); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), expected.shift()); + }, 2)); +} + +{ + const expected = ['hello', 'world']; + class Foo extends Transform { + async _flush(callback) { + callback(null, 'world'); + } + + _transform(chunk, encoding, callback) { + callback(null, chunk); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), expected.shift()); + }, 2)); +} + +{ + class Foo extends Transform { + async _flush(callback) { + throw new Error('boom'); + } + + _transform(chunk, encoding, callback) { + callback(null, chunk); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('data', common.mustCall()); + foo.on('error', common.expectsError({ + message: 'boom' + })); + foo.on('close', common.mustCall()); +} + +{ + class Foo extends Transform { + async _transform(chunk) { + return chunk.toString().toUpperCase(); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), 'HELLO'); + })); +} + +{ + class Foo extends Transform { + async _transform(chunk, _, callback) { + callback(null, chunk.toString().toUpperCase()); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('data', common.mustCall((chunk) => { + assert.strictEqual(chunk.toString(), 'HELLO'); + })); +} + +{ + class Foo extends Transform { + async _transform() { + throw new Error('boom'); + } + } + + const foo = new Foo(); + foo.end('hello'); + foo.on('error', common.expectsError({ + message: 'boom' + })); + foo.on('close', common.mustCall()); +}