From a6d98bc47947e1bf46e48f72b150173c7b26267a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Jan 2020 15:03:33 +0100 Subject: [PATCH 01/24] stream: support passing generator functions into pipeline --- doc/api/stream.md | 57 +++++- lib/internal/streams/pipeline.js | 180 +++++++++++++++-- test/parallel/test-stream-pipeline.js | 275 ++++++++++++++++++++++++++ tools/doc/type-parser.js | 2 + 4 files changed, 488 insertions(+), 26 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 4f9f8241d38084..35a384bc4b5b93 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1555,17 +1555,26 @@ const cleanup = finished(rs, (err) => { }); ``` -### `stream.pipeline(...streams, callback)` +### `stream.pipeline(source, ...streams, callback)` -* `...streams` {Stream} Two or more streams to pipe between. +* `source` {Stream|Iterable|AsyncIterable|Function} + * Returns: {Stream|Iterable|AsyncIterable} +* `...streams` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Stream|AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. * `err` {Error} +* Returns: {Stream} -A module method to pipe between streams forwarding errors and properly cleaning -up and provide a callback when the pipeline is complete. +A module method to pipe between streams and generators forwarding errors and +properly cleaning up and provide a callback when the pipeline is complete. ```js const { pipeline } = require('stream'); @@ -1608,6 +1617,41 @@ async function run() { run().catch(console.error); ``` +```js +const pipeline = util.promisify(stream.pipeline); +const fs = require('fs').promises; + +async function run() { + await pipeline( + async function*() { + const fd = await fs.open('archive.tar'); + try { + const chunk = new Buffer(1024); + const { bytesRead } = await fs.read(fd, chunk, 0, chunk.length, null); + if (bytesRead === 0) return; + yield chunk.slice(0, bytesRead); + } finally { + await fs.close(fd); + } + }, + zlib.createGzip(), + async function(source) { + const fd = await fs.open('archive.tar', 'w'); + try { + for await (const chunk of source) { + fs.write(fd, chunk); + } + } finally { + await fs.close(fd); + } + } + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. @@ -2700,8 +2744,7 @@ const pipeline = util.promisify(stream.pipeline); const writable = fs.createWriteStream('./file'); (async function() { - const readable = Readable.from(iterable); - await pipeline(readable, writable); + await pipeline(iterator, writable); })(); ``` @@ -2836,7 +2879,7 @@ contain multi-byte characters. [`stream.cork()`]: #stream_writable_cork [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options -[`stream.pipeline()`]: #stream_stream_pipeline_streams_callback +[`stream.pipeline()`]: #stream_stream_pipeline_source_streams_callback [`stream.uncork()`]: #stream_writable_uncork [`stream.unpipe()`]: #stream_readable_unpipe_destination [`stream.wrap()`]: #stream_readable_wrap_stream diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 92a91c30171af1..3e46de6346f851 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -5,17 +5,24 @@ const { ArrayIsArray, + SymbolAsyncIterator, + SymbolIterator } = primordials; let eos; const { once } = require('internal/util'); const { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_RETURN_VALUE, ERR_INVALID_CALLBACK, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED } = require('internal/errors').codes; +let EE; +let Readable; + function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } @@ -50,10 +57,6 @@ function destroyer(stream, reading, writing, callback) { }; } -function pipe(from, to) { - return from.pipe(to); -} - function popCallback(streams) { // Streams should never be an empty array. It should always contain at least // a single stream. Therefore optimize for the average case instead of @@ -63,8 +66,31 @@ function popCallback(streams) { return streams.pop(); } +function isPromise(obj) { + return !!(obj && typeof obj.then === 'function'); +} + +function isReadable(obj) { + return !!(obj && typeof obj.pipe === 'function'); +} + +function isWritable(obj) { + return !!(obj && typeof obj.write === 'function'); +} + +function isStream(obj) { + return isReadable(obj) || isWritable(obj); +} + +function isIterable(obj, isAsync) { + if (!obj) return false; + if (isAsync === true) return !!obj[SymbolAsyncIterator]; + if (isAsync === false) return !!obj[SymbolIterator]; + return !!(obj[SymbolAsyncIterator] || obj[SymbolIterator]); +} + function pipeline(...streams) { - const callback = popCallback(streams); + const callback = once(popCallback(streams)); if (ArrayIsArray(streams[0])) streams = streams[0]; @@ -73,25 +99,141 @@ function pipeline(...streams) { } let error; - const destroys = streams.map(function(stream, i) { + const destroys = []; + + function finish(err, val, final) { + if (!error && err) { + error = err; + } + + if (error || final) { + for (const destroy of destroys) { + destroy(error); + } + } + + if (final) { + callback(error, val); + } + } + + async function pump(iterable, writable) { + if (!EE) { + EE = require('events'); + } + try { + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); + } + } + writable.end(); + } catch (err) { + finish(err); + } + } + + function wrap(stream, reading, writing, final) { + destroys.push(destroyer(stream, reading, writing, (err) => { + finish(err, null, final); + })); + } + + function makeAsyncIterable(val, name) { + if (isIterable(val)) { + return val; + } else if (isReadable(val)) { + return _fromReadable(val); + } else if (isPromise(val)) { + return _fromPromise(val); + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable, Readable or Promise', name, val); + } + } + + async function* _fromPromise(val) { + yield await val; + } + + async function* _fromReadable(val) { + if (isReadable(val)) { + // Legacy streams are not Iterable. + + if (!Readable) { + Readable = require('_stream_readable'); + } + + wrap(val, true, false, false); + + const it = Readable.prototype[SymbolAsyncIterator].call(val); + while (true) { + const { value, done } = await it.next(); + if (done) return; + yield value; + } + } + } + + let ret; + for (let i = 0; i < streams.length; i++) { + const stream = streams[i]; const reading = i < streams.length - 1; const writing = i > 0; - return destroyer(stream, reading, writing, function(err) { - if (!error) error = err; - if (err) { - for (const destroy of destroys) { - destroy(err); - } + + if (isStream(stream)) { + wrap(stream, reading, writing, !reading); + } + + if (i === 0) { + if (typeof stream === 'function') { + ret = stream(); + } else if (isIterable(stream) || isReadable(stream)) { + ret = stream; + } else { + throw new ERR_INVALID_ARG_TYPE( + `streams[${i}]`, ['Stream', 'Iterable', 'AsyncIterable', 'Function'], + stream); } - if (reading) return; - for (const destroy of destroys) { - destroy(); + } else if (typeof stream === 'function') { + ret = stream(makeAsyncIterable(ret, `streams[${i - 1}]`)); + } else if (isStream(stream)) { + if (isReadable(ret)) { + ret.pipe(stream); + } else { + pump(makeAsyncIterable(ret, `streams[${i - 1}]`), stream); } - callback(error); - }); - }); + ret = stream; + } else { + throw new ERR_INVALID_ARG_TYPE( + `streams[${i}]`, ['Stream', 'Function'], ret); + } + } + + if (!isStream(ret)) { + if (!Readable) { + Readable = require('_stream_readable'); + } + + if (isPromise(ret)) { + // Finish eagerly + ret + .then((val) => { + finish(null, val, true); + }) + .catch((err) => { + finish(err, null, true); + }); + } + + ret = Readable + .from(makeAsyncIterable(ret, `streams[${streams.length - 1}]`)); + + wrap(ret, true, false, true); + } - return streams.reduce(pipe); + return ret; } module.exports = pipeline; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index f6ee97ba43d053..8a20f4df072266 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -516,3 +516,278 @@ const { promisify } = require('util'); }).on('error', common.mustNotCall()); }); } + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(function*() { + yield 'hello'; + yield 'world'; + }(), w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }(), w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(function*() { + yield 'hello'; + yield 'world'; + }, w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + const w = new Writable({ + write(chunk, encoding, callback) { + res += chunk; + callback(); + } + }); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, w, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'helloworld'); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, async function*(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }, async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'HELLOWORLD'); + })); +} + +{ + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, async function*(source) { + const ret = []; + for await (const chunk of source) { + ret.push(chunk.toUpperCase()); + } + yield ret; + }, async function(source) { + let ret = ''; + for await (const chunk of source) { + ret += chunk; + } + return ret; + }, common.mustCall((err, val) => { + assert.ok(!err); + assert.strictEqual(val, 'HELLOWORLD'); + })); +} + +{ + // AsyncIterable destination is returned and finalizes. + + const ret = pipeline(async function() { + await Promise.resolve(); + return ['hello']; + }, async function*(source) { + for await (const chunk of source) { + chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + // AsyncFunction destination is not returned and error is + // propagated. + + const ret = pipeline(async function() { + await Promise.resolve(); + throw new Error('kaboom'); + }, async function*(source) { + for await (const chunk of source) { + chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + const s = new PassThrough(); + pipeline(async function() { + throw new Error('kaboom'); + }, s, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(async function*() { + throw new Error('kaboom'); + }(), s, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(function*() { + throw new Error('kaboom'); + }, s, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(function*() { + throw new Error('kaboom'); + }(), s, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, s, async function(source) { + for await (const chunk of source) { + chunk; + throw new Error('kaboom'); + } + }, common.mustCall((err, val) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + const ret = pipeline(function() { + return ['hello', 'world']; + }, s, async function*(source) { + for await (const chunk of source) { + chunk; + throw new Error('kaboom'); + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(s.destroyed, true); + })); + ret.resume(); + assert.strictEqual(typeof ret.pipe, 'function'); +} + +{ + const s = new PassThrough(); + s.push('asd'); + s.push(null); + s[Symbol.asyncIterator] = null; + let ret = ''; + pipeline(s, async function(source) { + for await (const chunk of source) { + ret += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ret, 'asd'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(function(source) { + }, s, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(s, function(source) { + }, s, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} + +{ + const s = new PassThrough(); + assert.throws(() => { + pipeline(s, function(source) { + }, () => {}); + }, (err) => { + assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); + assert.strictEqual(s.destroyed, false); + return true; + }); +} diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index ef4499e50ff35a..5573c364da43e2 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -28,6 +28,8 @@ const customTypesMap = { 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', + 'AsyncIterable': 'https://tc39.github.io/ecma262/#sec-asynciterable-interface', + 'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`, 'Iterable': From b316f720eb434e29d5aef1a46e0cb023977196ff Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 01:27:17 +0100 Subject: [PATCH 02/24] fixup: move out closures --- lib/internal/streams/pipeline.js | 112 ++++++++++++++++--------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 3e46de6346f851..bfff85cd6f3cde 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -89,6 +89,63 @@ function isIterable(obj, isAsync) { return !!(obj[SymbolAsyncIterator] || obj[SymbolIterator]); } + +function makeAsyncIterable(val, name) { + if (isIterable(val)) { + return val; + } else if (isReadable(val)) { + return _fromReadable(val); + } else if (isPromise(val)) { + return _fromPromise(val); + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable, Readable or Promise', name, val); + } +} + +async function* _fromPromise(val) { + yield await val; +} + +async function* _fromReadable(val) { + if (isReadable(val)) { + // Legacy streams are not Iterable. + + if (!Readable) { + Readable = require('_stream_readable'); + } + + try { + const it = Readable.prototype[SymbolAsyncIterator].call(val); + while (true) { + const { value, done } = await it.next(); + if (done) return; + yield value; + } + } finally { + val.destroy(); + } + } +} + +async function pump(iterable, writable, finish) { + if (!EE) { + EE = require('events'); + } + try { + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); + } + } + writable.end(); + } catch (err) { + finish(err); + } +} + + function pipeline(...streams) { const callback = once(popCallback(streams)); @@ -117,65 +174,12 @@ function pipeline(...streams) { } } - async function pump(iterable, writable) { - if (!EE) { - EE = require('events'); - } - try { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); - } - } - writable.end(); - } catch (err) { - finish(err); - } - } - function wrap(stream, reading, writing, final) { destroys.push(destroyer(stream, reading, writing, (err) => { finish(err, null, final); })); } - function makeAsyncIterable(val, name) { - if (isIterable(val)) { - return val; - } else if (isReadable(val)) { - return _fromReadable(val); - } else if (isPromise(val)) { - return _fromPromise(val); - } else { - throw new ERR_INVALID_RETURN_VALUE( - 'AsyncIterable, Readable or Promise', name, val); - } - } - - async function* _fromPromise(val) { - yield await val; - } - - async function* _fromReadable(val) { - if (isReadable(val)) { - // Legacy streams are not Iterable. - - if (!Readable) { - Readable = require('_stream_readable'); - } - - wrap(val, true, false, false); - - const it = Readable.prototype[SymbolAsyncIterator].call(val); - while (true) { - const { value, done } = await it.next(); - if (done) return; - yield value; - } - } - } - let ret; for (let i = 0; i < streams.length; i++) { const stream = streams[i]; @@ -202,7 +206,7 @@ function pipeline(...streams) { if (isReadable(ret)) { ret.pipe(stream); } else { - pump(makeAsyncIterable(ret, `streams[${i - 1}]`), stream); + pump(makeAsyncIterable(ret, `streams[${i - 1}]`), stream, finish); } ret = stream; } else { From c8f065b8d50232a1303a9169cf6d8c59e5b69da0 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 01:27:52 +0100 Subject: [PATCH 03/24] fixup: Transform tests --- test/parallel/test-stream-pipeline.js | 61 +++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 8a20f4df072266..ddd8b967c8c966 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -791,3 +791,64 @@ const { promisify } = require('util'); return true; }); } + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + transform(chunk, encoding, cb) { + cb(new Error('kaboom')); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + transform(chunk, encoding, cb) { + process.nextTick(cb, new Error('kaboom')); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); +} + +{ + let res = ''; + pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, new Transform({ + decodeStrings: false, + transform(chunk, encoding, cb) { + cb(null, chunk.toUpperCase()); + } + }), async function(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.ok(!err); + assert.strictEqual(res, 'HELLOWORLD'); + })); +} From 16e8ba1d2ea7171bf86a6c6a83b63f9d7db02dd6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 11:49:11 +0100 Subject: [PATCH 04/24] fixup: whitespace --- lib/internal/streams/pipeline.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index bfff85cd6f3cde..abe9863b9566c6 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -89,7 +89,6 @@ function isIterable(obj, isAsync) { return !!(obj[SymbolAsyncIterator] || obj[SymbolIterator]); } - function makeAsyncIterable(val, name) { if (isIterable(val)) { return val; @@ -145,7 +144,6 @@ async function pump(iterable, writable, finish) { } } - function pipeline(...streams) { const callback = once(popCallback(streams)); From d730f88725638122063eec9a2157c7890db39456 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 15:03:11 +0100 Subject: [PATCH 05/24] fixup: unhandled rejection --- lib/internal/streams/pipeline.js | 6 ++++++ test/parallel/test-stream-pipeline.js | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index abe9863b9566c6..21098cefb62b73 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -23,6 +23,8 @@ const { let EE; let Readable; +function nop() {} + function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } @@ -95,6 +97,10 @@ function makeAsyncIterable(val, name) { } else if (isReadable(val)) { return _fromReadable(val); } else if (isPromise(val)) { + // Generator will subscribe to the promise in a lazy fashion + // while the promise executes eagerly. Thus we need to register + // a rejection handler to avoid unhandled rejection errors. + val.catch(nop); return _fromPromise(val); } else { throw new ERR_INVALID_RETURN_VALUE( diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index ddd8b967c8c966..230b35511bb0d9 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -852,3 +852,24 @@ const { promisify } = require('util'); assert.strictEqual(res, 'HELLOWORLD'); })); } + +{ + // Ensure no unhandled rejection from async function. + + let res = ''; + const ret = pipeline(async function*() { + await Promise.resolve(); + yield 'hello'; + yield 'world'; + }, async function() { + throw new Error('kaboom'); + }, async function*(source) { + for await (const chunk of source) { + res += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(res, ''); + })); + ret.resume(); +} From b1562124511ae1c48ea952a75e2d4385c9120485 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 15:06:29 +0100 Subject: [PATCH 06/24] fixup: use Passthrough for return --- lib/internal/streams/pipeline.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 21098cefb62b73..0acc1cf5d1da3a 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -22,6 +22,7 @@ const { let EE; let Readable; +let Passthrough; function nop() {} @@ -220,24 +221,27 @@ function pipeline(...streams) { } if (!isStream(ret)) { - if (!Readable) { - Readable = require('_stream_readable'); + if (!Passthrough) { + Passthrough = require('_stream_passthrough'); } + const pt = new Passthrough(); + if (isPromise(ret)) { // Finish eagerly ret .then((val) => { + pt.end(val); finish(null, val, true); }) .catch((err) => { finish(err, null, true); }); + } else { + pump(makeAsyncIterable(ret, `streams[${streams.length - 1}]`), pt, finish); } - ret = Readable - .from(makeAsyncIterable(ret, `streams[${streams.length - 1}]`)); - + ret = pt; wrap(ret, true, false, true); } From 2f27c8672100fe820106aa101c849d00c0283ff8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 15:19:09 +0100 Subject: [PATCH 07/24] fixup: only allow destination to return Promise --- doc/api/stream.md | 3 +++ lib/internal/streams/pipeline.js | 8 +------- test/parallel/test-stream-pipeline.js | 10 +++++----- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 35a384bc4b5b93..af1bb41a13b219 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1567,6 +1567,9 @@ changes: * `source` {Stream|Iterable|AsyncIterable|Function} * Returns: {Stream|Iterable|AsyncIterable} * `...streams` {Stream|Function} + * `source` {AsyncIterable} + * Returns: {Stream|AsyncIterable} +* `destination` {Stream|Function} * `source` {AsyncIterable} * Returns: {Stream|AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 0acc1cf5d1da3a..c49433d5a558f7 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -97,15 +97,9 @@ function makeAsyncIterable(val, name) { return val; } else if (isReadable(val)) { return _fromReadable(val); - } else if (isPromise(val)) { - // Generator will subscribe to the promise in a lazy fashion - // while the promise executes eagerly. Thus we need to register - // a rejection handler to avoid unhandled rejection errors. - val.catch(nop); - return _fromPromise(val); } else { throw new ERR_INVALID_RETURN_VALUE( - 'AsyncIterable, Readable or Promise', name, val); + 'AsyncIterable, Readable', name, val); } } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 230b35511bb0d9..5d70ec868c9777 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -633,9 +633,9 @@ const { promisify } = require('util'); { // AsyncIterable destination is returned and finalizes. - const ret = pipeline(async function() { + const ret = pipeline(async function*() { await Promise.resolve(); - return ['hello']; + yield 'hello'; }, async function*(source) { for await (const chunk of source) { chunk; @@ -651,7 +651,7 @@ const { promisify } = require('util'); // AsyncFunction destination is not returned and error is // propagated. - const ret = pipeline(async function() { + const ret = pipeline(async function*() { await Promise.resolve(); throw new Error('kaboom'); }, async function*(source) { @@ -667,7 +667,7 @@ const { promisify } = require('util'); { const s = new PassThrough(); - pipeline(async function() { + pipeline(async function*() { throw new Error('kaboom'); }, s, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); @@ -861,7 +861,7 @@ const { promisify } = require('util'); await Promise.resolve(); yield 'hello'; yield 'world'; - }, async function() { + }, async function*() { throw new Error('kaboom'); }, async function*(source) { for await (const chunk of source) { From 86f83df2812834a4903265eb83bada7350688a21 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:02:58 +0100 Subject: [PATCH 08/24] fixup: refactor --- lib/internal/streams/pipeline.js | 48 +++++++++++++++----------------- 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index c49433d5a558f7..14584758032f09 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -21,8 +21,8 @@ const { } = require('internal/errors').codes; let EE; -let Readable; -let Passthrough; +let PassThrough; +let createReadableStreamAsyncIterator; function nop() {} @@ -96,6 +96,7 @@ function makeAsyncIterable(val, name) { if (isIterable(val)) { return val; } else if (isReadable(val)) { + // Legacy streams are not Iterable. return _fromReadable(val); } else { throw new ERR_INVALID_RETURN_VALUE( @@ -103,28 +104,21 @@ function makeAsyncIterable(val, name) { } } -async function* _fromPromise(val) { - yield await val; -} - async function* _fromReadable(val) { - if (isReadable(val)) { - // Legacy streams are not Iterable. - - if (!Readable) { - Readable = require('_stream_readable'); - } + if (!createReadableStreamAsyncIterator) { + createReadableStreamAsyncIterator = + require('internal/streams/async_iterator'); + } - try { - const it = Readable.prototype[SymbolAsyncIterator].call(val); - while (true) { - const { value, done } = await it.next(); - if (done) return; - yield value; - } - } finally { - val.destroy(); + try { + const it = createReadableStreamAsyncIterator(val); + while (true) { + const { value, done } = await it.next(); + if (done) return; + yield value; } + } finally { + val.destroy(); } } @@ -200,12 +194,14 @@ function pipeline(...streams) { stream); } } else if (typeof stream === 'function') { - ret = stream(makeAsyncIterable(ret, `streams[${i - 1}]`)); + ret = makeAsyncIterable(ret, `streams[${i - 1}]`); + ret = stream(ret); } else if (isStream(stream)) { if (isReadable(ret)) { ret.pipe(stream); } else { - pump(makeAsyncIterable(ret, `streams[${i - 1}]`), stream, finish); + ret = makeAsyncIterable(ret, `streams[${i - 1}]`); + pump(ret, stream, finish); } ret = stream; } else { @@ -215,11 +211,11 @@ function pipeline(...streams) { } if (!isStream(ret)) { - if (!Passthrough) { - Passthrough = require('_stream_passthrough'); + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); } - const pt = new Passthrough(); + const pt = new PassThrough(); if (isPromise(ret)) { // Finish eagerly From 7ec44fe72865b6408447a445e59fa0ad0fc3b826 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:36:04 +0100 Subject: [PATCH 09/24] fixup: refactor --- doc/api/stream.md | 6 +-- lib/internal/streams/pipeline.js | 73 ++++++++++++++++++-------------- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index af1bb41a13b219..d7ef4bba5bfec4 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1565,13 +1565,13 @@ changes: --> * `source` {Stream|Iterable|AsyncIterable|Function} - * Returns: {Stream|Iterable|AsyncIterable} + * Returns: {Iterable|AsyncIterable} * `...streams` {Stream|Function} * `source` {AsyncIterable} - * Returns: {Stream|AsyncIterable} + * Returns: {AsyncIterable} * `destination` {Stream|Function} * `source` {AsyncIterable} - * Returns: {Stream|AsyncIterable|Promise} + * Returns: {AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. * `err` {Error} * Returns: {Stream} diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 14584758032f09..a65a6e4cedfb86 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -99,8 +99,8 @@ function makeAsyncIterable(val, name) { // Legacy streams are not Iterable. return _fromReadable(val); } else { - throw new ERR_INVALID_RETURN_VALUE( - 'AsyncIterable, Readable', name, val); + throw new ERR_INVALID_ARG_TYPE( + 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); } } @@ -186,53 +186,64 @@ function pipeline(...streams) { if (i === 0) { if (typeof stream === 'function') { ret = stream(); + if (!isIterable(ret)) { + throw new ERR_INVALID_RETURN_VALUE( + 'Iterable, AsyncIterable, Stream', 'source', ret); + } } else if (isIterable(stream) || isReadable(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( - `streams[${i}]`, ['Stream', 'Iterable', 'AsyncIterable', 'Function'], + `source`, ['Stream', 'Iterable', 'AsyncIterable', 'Function'], stream); } } else if (typeof stream === 'function') { - ret = makeAsyncIterable(ret, `streams[${i - 1}]`); + ret = makeAsyncIterable(ret); ret = stream(ret); + + if (reading) { + if (!isIterable(ret)) { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable', `transform[${i - 1}]`, ret); + } + } else if (!isStream(ret)) { + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); + } + + const pt = new PassThrough(); + if (isPromise(ret)) { + ret + .then((val) => { + pt.end(val); + finish(null, val, true); + }) + .catch((err) => { + finish(err, null, true); + }); + } else if (isIterable(ret, true)) { + pump(ret, pt, finish); + } else { + throw new ERR_INVALID_RETURN_VALUE( + 'AsyncIterable, Promise', `destination`, ret); + } + + ret = pt; + wrap(ret, true, false, true); + } } else if (isStream(stream)) { if (isReadable(ret)) { ret.pipe(stream); } else { - ret = makeAsyncIterable(ret, `streams[${i - 1}]`); + ret = makeAsyncIterable(ret); pump(ret, stream, finish); } ret = stream; } else { + const name = reading ? `transform[${i - 1}]` : 'destination'; throw new ERR_INVALID_ARG_TYPE( - `streams[${i}]`, ['Stream', 'Function'], ret); - } - } - - if (!isStream(ret)) { - if (!PassThrough) { - PassThrough = require('_stream_passthrough'); + name, ['Stream', 'Function'], ret); } - - const pt = new PassThrough(); - - if (isPromise(ret)) { - // Finish eagerly - ret - .then((val) => { - pt.end(val); - finish(null, val, true); - }) - .catch((err) => { - finish(err, null, true); - }); - } else { - pump(makeAsyncIterable(ret, `streams[${streams.length - 1}]`), pt, finish); - } - - ret = pt; - wrap(ret, true, false, true); } return ret; From 07220bd426a30787dc09b2e995c236a5ef88f6d4 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:39:51 +0100 Subject: [PATCH 10/24] fixup: yield* --- lib/internal/streams/pipeline.js | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index a65a6e4cedfb86..ea34b4fadda983 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -111,12 +111,7 @@ async function* _fromReadable(val) { } try { - const it = createReadableStreamAsyncIterator(val); - while (true) { - const { value, done } = await it.next(); - if (done) return; - yield value; - } + yield* createReadableStreamAsyncIterator(val); } finally { val.destroy(); } From 6c2ae33ccd07ba55e6fd1ff24a8ff7e68110fc7f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:40:09 +0100 Subject: [PATCH 11/24] fixup: don't allow return of Stream --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index ea34b4fadda983..3d5acf14519c13 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -201,7 +201,7 @@ function pipeline(...streams) { throw new ERR_INVALID_RETURN_VALUE( 'AsyncIterable', `transform[${i - 1}]`, ret); } - } else if (!isStream(ret)) { + } else { if (!PassThrough) { PassThrough = require('_stream_passthrough'); } From 8f54c8e0d2691dc56a2f7cd95137a758fbe2f00d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:42:10 +0100 Subject: [PATCH 12/24] fixup: typeof function --- lib/internal/streams/pipeline.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 3d5acf14519c13..05741c19b92e3c 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -87,12 +87,13 @@ function isStream(obj) { function isIterable(obj, isAsync) { if (!obj) return false; - if (isAsync === true) return !!obj[SymbolAsyncIterator]; - if (isAsync === false) return !!obj[SymbolIterator]; - return !!(obj[SymbolAsyncIterator] || obj[SymbolIterator]); + if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; + if (isAsync === false) return typeof obj[SymbolIterator] === 'function'; + return typeof obj[SymbolAsyncIterator] === 'function' || + typeof obj[SymbolIterator] === 'function'; } -function makeAsyncIterable(val, name) { +function makeAsyncIterable(val) { if (isIterable(val)) { return val; } else if (isReadable(val)) { From ab7434d1c8cb316f0c3269cef48ae63970ecba5d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 17:43:02 +0100 Subject: [PATCH 13/24] fixup: docs missing await --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index d7ef4bba5bfec4..2056dd41d9a897 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1642,7 +1642,7 @@ async function run() { const fd = await fs.open('archive.tar', 'w'); try { for await (const chunk of source) { - fs.write(fd, chunk); + await fs.write(fd, chunk); } } finally { await fs.close(fd); From ec7f64d4349c78c6c39d3726df18fa0f702484b7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 18:00:08 +0100 Subject: [PATCH 14/24] fixup: streams v1 support --- lib/internal/streams/pipeline.js | 15 +++++++++-- test/parallel/test-stream-pipeline.js | 39 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 05741c19b92e3c..d6c2045a8fb343 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -24,8 +24,6 @@ let EE; let PassThrough; let createReadableStreamAsyncIterator; -function nop() {} - function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } @@ -111,6 +109,19 @@ async function* _fromReadable(val) { require('internal/streams/async_iterator'); } + if (typeof val.read !== 'function') { + // createReadableStreamAsyncIterator does not support + // v1 streams. Convert it into a v2 stream. + + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); + } + + val = val + .on('error', err => val.destroy(err)) + .pipe(new PassThrough()); + } + try { yield* createReadableStreamAsyncIterator(val); } finally { diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 5d70ec868c9777..25c9f3237053f0 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -740,6 +740,8 @@ const { promisify } = require('util'); } { + // Legacy streams without async iterator. + const s = new PassThrough(); s.push('asd'); s.push(null); @@ -756,6 +758,43 @@ const { promisify } = require('util'); })); } +{ + // v1 streams without read(). + + const s = new Stream(); + process.nextTick(() => { + s.emit('data', 'asd'); + s.emit('end'); + }); + let ret = ''; + pipeline(s, async function(source) { + for await (const chunk of source) { + ret += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err, undefined); + assert.strictEqual(ret, 'asd'); + assert.strictEqual(s.destroyed, true); + })); +} + +{ + // v1 error streams without read(). + + const s = new Stream(); + process.nextTick(() => { + s.emit('error', new Error('kaboom')); + }); + let ret = ''; + pipeline(s, async function(source) { + for await (const chunk of source) { + ret += chunk; + } + }, common.mustCall((err) => { + assert.strictEqual(err.message, 'kaboom'); + })); +} + { const s = new PassThrough(); assert.throws(() => { From f783c64c171eee81e476de5683bf8b33e37bbd14 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 18:02:59 +0100 Subject: [PATCH 15/24] fixup: linting : linting --- lib/internal/streams/pipeline.js | 8 ++++---- test/parallel/test-stream-pipeline.js | 4 ---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index d6c2045a8fb343..f0e5ebd9b79263 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -118,7 +118,7 @@ async function* _fromReadable(val) { } val = val - .on('error', err => val.destroy(err)) + .on('error', (err) => val.destroy(err)) .pipe(new PassThrough()); } @@ -195,13 +195,13 @@ function pipeline(...streams) { ret = stream(); if (!isIterable(ret)) { throw new ERR_INVALID_RETURN_VALUE( - 'Iterable, AsyncIterable, Stream', 'source', ret); + 'Iterable, AsyncIterable or Stream', 'source', ret); } } else if (isIterable(stream) || isReadable(stream)) { ret = stream; } else { throw new ERR_INVALID_ARG_TYPE( - `source`, ['Stream', 'Iterable', 'AsyncIterable', 'Function'], + 'source', ['Stream', 'Iterable', 'AsyncIterable', 'Function'], stream); } } else if (typeof stream === 'function') { @@ -232,7 +232,7 @@ function pipeline(...streams) { pump(ret, pt, finish); } else { throw new ERR_INVALID_RETURN_VALUE( - 'AsyncIterable, Promise', `destination`, ret); + 'AsyncIterable or Promise', 'destination', ret); } ret = pt; diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 25c9f3237053f0..3b187751876232 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -785,11 +785,7 @@ const { promisify } = require('util'); process.nextTick(() => { s.emit('error', new Error('kaboom')); }); - let ret = ''; pipeline(s, async function(source) { - for await (const chunk of source) { - ret += chunk; - } }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); })); From b49fd4d8fe58fa58fdae67bba6b3585029484c76 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 18:17:05 +0100 Subject: [PATCH 16/24] fixup: test --- test/parallel/test-stream-pipeline.js | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 3b187751876232..0e6e19ae7212e2 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -891,20 +891,11 @@ const { promisify } = require('util'); { // Ensure no unhandled rejection from async function. - let res = ''; - const ret = pipeline(async function*() { - await Promise.resolve(); + pipeline(async function*() { yield 'hello'; - yield 'world'; - }, async function*() { + }, async function(source) { throw new Error('kaboom'); - }, async function*(source) { - for await (const chunk of source) { - res += chunk; - } }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); - assert.strictEqual(res, ''); })); - ret.resume(); } From 524366d71a4ba9512315e1d023dd50bd07683f7a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 18:58:53 +0100 Subject: [PATCH 17/24] Update doc/api/stream.md Co-Authored-By: Benjamin Gruenbaum --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 2056dd41d9a897..548c440745de8b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1561,7 +1561,7 @@ added: v10.0.0 changes: - version: REPLACEME pr-url: https://github.com/nodejs/node/pull/31223 - description: Add support for functions and generators. + description: Add support for functions generators and async generators. --> * `source` {Stream|Iterable|AsyncIterable|Function} From 4384f7a5dbe1474b0810e64a6339083520439b09 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 20:48:52 +0100 Subject: [PATCH 18/24] fixup: doc typo --- doc/api/stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 548c440745de8b..5c04e7abf12687 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1561,7 +1561,7 @@ added: v10.0.0 changes: - version: REPLACEME pr-url: https://github.com/nodejs/node/pull/31223 - description: Add support for functions generators and async generators. + description: Add support for async generators. --> * `source` {Stream|Iterable|AsyncIterable|Function} @@ -2747,7 +2747,7 @@ const pipeline = util.promisify(stream.pipeline); const writable = fs.createWriteStream('./file'); (async function() { - await pipeline(iterator, writable); + await pipeline(iterable, writable); })(); ``` From 01f0f41c715c5eeb31bfe50163400bf93e5b6f16 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 20:50:06 +0100 Subject: [PATCH 19/24] fixup: docs callback val --- doc/api/stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 5c04e7abf12687..8d9efd64d97ee7 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1574,6 +1574,7 @@ changes: * Returns: {AsyncIterable|Promise} * `callback` {Function} Called when the pipeline is fully done. * `err` {Error} + * `val` Resolved value of `Promise` returned by `destination`. * Returns: {Stream} A module method to pipe between streams and generators forwarding errors and From c6483dcf975da8dd13cc20a8ee598f6e9e5a84ed Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 20:55:15 +0100 Subject: [PATCH 20/24] fixup: v1 stream cleanup --- lib/internal/streams/pipeline.js | 34 ++++++++++++++++----------- test/parallel/test-stream-pipeline.js | 2 ++ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index f0e5ebd9b79263..eb1a6e2df67757 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -109,23 +109,29 @@ async function* _fromReadable(val) { require('internal/streams/async_iterator'); } - if (typeof val.read !== 'function') { - // createReadableStreamAsyncIterator does not support - // v1 streams. Convert it into a v2 stream. - - if (!PassThrough) { - PassThrough = require('_stream_passthrough'); - } + try { + if (typeof val.read !== 'function') { + // createReadableStreamAsyncIterator does not support + // v1 streams. Convert it into a v2 stream. - val = val - .on('error', (err) => val.destroy(err)) - .pipe(new PassThrough()); - } + if (!PassThrough) { + PassThrough = require('_stream_passthrough'); + } - try { - yield* createReadableStreamAsyncIterator(val); + const pt = new PassThrough(); + val + .on('error', (err) => pt.destroy(err)) + .pipe(pt); + yield* createReadableStreamAsyncIterator(pt); + } else { + yield* createReadableStreamAsyncIterator(val); + } } finally { - val.destroy(); + if (typeof val.destroy === 'function') { + val.destroy(); + } else if (typeof val.close === 'function') { + val.close(); + } } } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 0e6e19ae7212e2..988d23d5adea38 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -766,6 +766,7 @@ const { promisify } = require('util'); s.emit('data', 'asd'); s.emit('end'); }); + s.close = common.mustCall(); let ret = ''; pipeline(s, async function(source) { for await (const chunk of source) { @@ -785,6 +786,7 @@ const { promisify } = require('util'); process.nextTick(() => { s.emit('error', new Error('kaboom')); }); + s.destroy = common.mustCall(); pipeline(s, async function(source) { }, common.mustCall((err) => { assert.strictEqual(err.message, 'kaboom'); From 2a99cdbfdfd90eeb5e7b46c141c2b4736179d133 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 21:09:51 +0100 Subject: [PATCH 21/24] fixup: reuse destroy logic --- lib/internal/streams/pipeline.js | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index eb1a6e2df67757..b75ce7f156ec04 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -28,6 +28,14 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } +function destroyStream(stream, err) { + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} + function destroyer(stream, reading, writing, callback) { callback = once(callback); @@ -49,10 +57,7 @@ function destroyer(stream, reading, writing, callback) { if (destroyed) return; destroyed = true; - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); + destroyStream(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; @@ -127,11 +132,7 @@ async function* _fromReadable(val) { yield* createReadableStreamAsyncIterator(val); } } finally { - if (typeof val.destroy === 'function') { - val.destroy(); - } else if (typeof val.close === 'function') { - val.close(); - } + destroyStream(val); } } From c14ef26d7111019b19d46f759cc313e81956be42 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Jan 2020 21:18:56 +0100 Subject: [PATCH 22/24] fixup: doc --- doc/api/stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 8d9efd64d97ee7..219b3ad080a524 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1555,7 +1555,7 @@ const cleanup = finished(rs, (err) => { }); ``` -### `stream.pipeline(source, ...streams, callback)` +### `stream.pipeline(source, ...transforms, destination, callback)`