From 571e86741690caa217124f993052bfa1fe1bd30f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:38:30 +0100 Subject: [PATCH 01/13] stream: complete pipeline with stdio stdio (stderr & stdout) should for compatibility reasons not be closed/end():ed. However, this causes pipeline with a stdio destination to never finish. This commit fixes this issue at a performance cost. Refs: https://github.com/nodejs/node/issues/7606 Fixes: https://github.com/nodejs/node/issues/32363 --- lib/internal/streams/pipeline.js | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 51bd99b654f23d..6396d7455bc28e 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,7 +6,8 @@ const { ArrayIsArray, SymbolAsyncIterator, - SymbolIterator + SymbolIterator, + Promise } = primordials; let eos; @@ -147,13 +148,23 @@ async function pump(iterable, writable, finish) { } let error; try { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); + if (writable !== process.stdout && writable !== process.stderr) { + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); + } + } + writable.end(); + } else { + for await (const chunk of iterable) { + await new Promise((resolve, reject) => { + writable.write(chunk, null, (err) => { + err ? reject(err) : resolve(); + }); + }); } } - writable.end(); } catch (err) { error = err; } finally { @@ -202,7 +213,9 @@ function pipeline(...streams) { const reading = i < streams.length - 1; const writing = i > 0; - if (isStream(stream)) { + if (stream === process.stdout || stream === process.stderr) { + // `pipe()` doesn't .end() stdout and stderr. + } else if (isStream(stream)) { finishCount++; destroys.push(destroyer(stream, reading, writing, !reading, finish)); } @@ -263,7 +276,10 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret)) { + if (isReadable(ret) && + stream !== process.stdout && + stream !== process.stderr + ) { ret.pipe(stream); } else { ret = makeAsyncIterable(ret); From ab76605ae22c685fc854150462350d93720c096f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:46:07 +0100 Subject: [PATCH 02/13] fixup: isStdio helper --- lib/internal/streams/pipeline.js | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 6396d7455bc28e..74147eb83eaff5 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -114,6 +114,10 @@ function isStream(obj) { return isReadable(obj) || isWritable(obj); } +function isStdio(obj) { + return obj === process.stdout || obj === process.stderr; +} + function isIterable(obj, isAsync) { if (!obj) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -148,7 +152,7 @@ async function pump(iterable, writable, finish) { } let error; try { - if (writable !== process.stdout && writable !== process.stderr) { + if (!isStdio(writable)) { for await (const chunk of iterable) { if (!writable.write(chunk)) { if (writable.destroyed) return; @@ -157,12 +161,18 @@ async function pump(iterable, writable, finish) { } writable.end(); } else { + const errorPromise = new Promise((resolve, reject) => { + writable.on('error', reject); + }); for await (const chunk of iterable) { - await new Promise((resolve, reject) => { - writable.write(chunk, null, (err) => { - err ? reject(err) : resolve(); - }); - }); + await Promise.race([ + errorPromise, + await new Promise((resolve, reject) => { + writable.write(chunk, null, (err) => { + err ? reject(err) : resolve(); + }); + }) + ]); } } } catch (err) { @@ -213,9 +223,7 @@ function pipeline(...streams) { const reading = i < streams.length - 1; const writing = i > 0; - if (stream === process.stdout || stream === process.stderr) { - // `pipe()` doesn't .end() stdout and stderr. - } else if (isStream(stream)) { + if (isStream(stream) && !isStdio(stream)) { finishCount++; destroys.push(destroyer(stream, reading, writing, !reading, finish)); } @@ -276,10 +284,7 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret) && - stream !== process.stdout && - stream !== process.stderr - ) { + if (isReadable(ret) && !isStdio(stream)) { ret.pipe(stream); } else { ret = makeAsyncIterable(ret); From 84cf2aa66f0722142bf37ff8ae63b78cc1c59728 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:46:13 +0100 Subject: [PATCH 03/13] fixup: test --- test/parallel/test-stream-pipeline-process.js | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 test/parallel/test-stream-pipeline-process.js diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js new file mode 100644 index 00000000000000..f6323f06f7b593 --- /dev/null +++ b/test/parallel/test-stream-pipeline-process.js @@ -0,0 +1,25 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +if (process.argv[2] === 'child') { + const { pipeline } = require('stream'); + pipeline( + process.stdin, + process.stdout, + common.mustCall() + ); +} else { + const cp = require('child_process'); + cp.exec([ + 'echo', + '"hello"', + '|', + `"${process.execPath}"`, + `"${__filename}"`, + 'child' + ].join(' '), common.mustCall((err) => { + assert.ifError(err); + })); +} From b2925d8a81bbf54f9bcb6682abe2ee367580e78e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:47:12 +0100 Subject: [PATCH 04/13] fixup: typo --- lib/internal/streams/pipeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 74147eb83eaff5..9b4741cb7e5d8b 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -167,7 +167,7 @@ async function pump(iterable, writable, finish) { for await (const chunk of iterable) { await Promise.race([ errorPromise, - await new Promise((resolve, reject) => { + new Promise((resolve, reject) => { writable.write(chunk, null, (err) => { err ? reject(err) : resolve(); }); From bb882a2e15ab755dd4cc3bfb68ee8cb30eb084f3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:48:34 +0100 Subject: [PATCH 05/13] fixup: test --- test/parallel/test-stream-pipeline-process.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index f6323f06f7b593..bc04abbdf5f7d5 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -8,7 +8,9 @@ if (process.argv[2] === 'child') { pipeline( process.stdin, process.stdout, - common.mustCall() + common.mustCall((err) => { + assert.ifError(err); + }) ); } else { const cp = require('child_process'); From 1d0a7a5a6fc830aad6ce2cccfcc0f74bccac5026 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 19 Mar 2020 22:55:02 +0100 Subject: [PATCH 06/13] fixup: unhandledRejection --- lib/internal/streams/pipeline.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 9b4741cb7e5d8b..087107d5fcc8c5 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -164,6 +164,8 @@ async function pump(iterable, writable, finish) { const errorPromise = new Promise((resolve, reject) => { writable.on('error', reject); }); + // Don't propagate to unhandledRejection + errorPromise.catch(() => {}); for await (const chunk of iterable) { await Promise.race([ errorPromise, From 3444617bdbcd369f5c8b0579b2fc2b6145deefc3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Mar 2020 00:20:10 +0100 Subject: [PATCH 07/13] fixup: faster version --- lib/internal/streams/pipeline.js | 58 +++++++++++-------- test/parallel/test-stream-pipeline-process.js | 3 +- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 087107d5fcc8c5..08e6f4d67bd42f 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -146,37 +146,43 @@ async function* fromReadable(val) { yield* createReadableStreamAsyncIterator(val); } +function pumpStdio(iterable, writable, finish) { + let iterator; + if (iterable[SymbolAsyncIterator]) + iterator = iterable[SymbolAsyncIterator](); + else if (iterable[SymbolIterator]) + iterator = iterable[SymbolIterator](); + + async function _next(err) { + if (err) { + return finish(err); + } + try { + const { value, done } = await iterator.next(); + if (done) { + return finish(); + } + writable.write(value, _next); + } catch (err) { + finish(err); + } + } + _next(); +} + async function pump(iterable, writable, finish) { if (!EE) { EE = require('events'); } let error; try { - if (!isStdio(writable)) { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); - } - } - writable.end(); - } else { - const errorPromise = new Promise((resolve, reject) => { - writable.on('error', reject); - }); - // Don't propagate to unhandledRejection - errorPromise.catch(() => {}); - for await (const chunk of iterable) { - await Promise.race([ - errorPromise, - new Promise((resolve, reject) => { - writable.write(chunk, null, (err) => { - err ? reject(err) : resolve(); - }); - }) - ]); + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); } } + writable.end(); } catch (err) { error = err; } finally { @@ -292,7 +298,11 @@ function pipeline(...streams) { ret = makeAsyncIterable(ret); finishCount++; - pump(ret, stream, finish); + if (isStdio(stream)) { + pumpStdio(ret, stream, finish); + } else { + pump(ret, stream, finish); + } } ret = stream; } else { diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index bc04abbdf5f7d5..f68a83425daf83 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -21,7 +21,8 @@ if (process.argv[2] === 'child') { `"${process.execPath}"`, `"${__filename}"`, 'child' - ].join(' '), common.mustCall((err) => { + ].join(' '), common.mustCall((err, stdout) => { assert.ifError(err); + assert.strictEqual(stdout, 'hello'); })); } From e9e444452c545c69c0bcba90e7f0138aa018e0fb Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Mar 2020 00:41:17 +0100 Subject: [PATCH 08/13] fixup: unify --- lib/internal/streams/pipeline.js | 58 ++++++++----------- test/parallel/test-stream-pipeline-process.js | 2 +- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 08e6f4d67bd42f..0491bd8bfef2d9 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -146,43 +146,37 @@ async function* fromReadable(val) { yield* createReadableStreamAsyncIterator(val); } -function pumpStdio(iterable, writable, finish) { - let iterator; - if (iterable[SymbolAsyncIterator]) - iterator = iterable[SymbolAsyncIterator](); - else if (iterable[SymbolIterator]) - iterator = iterable[SymbolIterator](); - - async function _next(err) { - if (err) { - return finish(err); - } - try { - const { value, done } = await iterator.next(); - if (done) { - return finish(); - } - writable.write(value, _next); - } catch (err) { - finish(err); - } - } - _next(); -} - async function pump(iterable, writable, finish) { if (!EE) { EE = require('events'); } let error; + writable.on('error', (err) => { + error = err; + }); try { - for await (const chunk of iterable) { - if (!writable.write(chunk)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); + let prev; + for await (const next of iterable) { + if (prev != null) { + if (!writable.write(prev)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); + } } + prev = next; + } + + if (prev != null) { + await new Promise((resolve, reject) => { + writable.write(prev, (err) => { + err ? reject(err) : resolve(); + }); + }); + } + + if (!isStdio(writable)) { + writable.end(); } - writable.end(); } catch (err) { error = err; } finally { @@ -298,11 +292,7 @@ function pipeline(...streams) { ret = makeAsyncIterable(ret); finishCount++; - if (isStdio(stream)) { - pumpStdio(ret, stream, finish); - } else { - pump(ret, stream, finish); - } + pump(ret, stream, finish); } ret = stream; } else { diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index f68a83425daf83..aa4fcd7d06f4e6 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -23,6 +23,6 @@ if (process.argv[2] === 'child') { 'child' ].join(' '), common.mustCall((err, stdout) => { assert.ifError(err); - assert.strictEqual(stdout, 'hello'); + assert.strictEqual(stdout, 'hello\n'); })); } From d9d992a12f37fd8686029b28d6dcc01bf28cd0d6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 20 Mar 2020 10:15:09 +0100 Subject: [PATCH 09/13] fixup: end stdio --- lib/internal/streams/pipeline.js | 46 +++++++++++--------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 0491bd8bfef2d9..1e28a419501bce 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,8 +6,7 @@ const { ArrayIsArray, SymbolAsyncIterator, - SymbolIterator, - Promise + SymbolIterator } = primordials; let eos; @@ -114,10 +113,6 @@ function isStream(obj) { return isReadable(obj) || isWritable(obj); } -function isStdio(obj) { - return obj === process.stdout || obj === process.stderr; -} - function isIterable(obj, isAsync) { if (!obj) return false; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; @@ -151,32 +146,14 @@ async function pump(iterable, writable, finish) { EE = require('events'); } let error; - writable.on('error', (err) => { - error = err; - }); try { - let prev; - for await (const next of iterable) { - if (prev != null) { - if (!writable.write(prev)) { - if (writable.destroyed) return; - await EE.once(writable, 'drain'); - } + for await (const chunk of iterable) { + if (!writable.write(chunk)) { + if (writable.destroyed) return; + await EE.once(writable, 'drain'); } - prev = next; - } - - if (prev != null) { - await new Promise((resolve, reject) => { - writable.write(prev, (err) => { - err ? reject(err) : resolve(); - }); - }); - } - - if (!isStdio(writable)) { - writable.end(); } + writable.end(); } catch (err) { error = err; } finally { @@ -225,7 +202,7 @@ function pipeline(...streams) { const reading = i < streams.length - 1; const writing = i > 0; - if (isStream(stream) && !isStdio(stream)) { + if (isStream(stream)) { finishCount++; destroys.push(destroyer(stream, reading, writing, !reading, finish)); } @@ -286,8 +263,15 @@ function pipeline(...streams) { destroys.push(destroyer(ret, false, true, true, finish)); } } else if (isStream(stream)) { - if (isReadable(ret) && !isStdio(stream)) { + if (isReadable(ret)) { ret.pipe(stream); + + // Compat. Before node v10.12.0 stdio used to throw an error so + // pipe() did/does not end() stdio destinations. + // Now they allow it but "secretly" don't close the underlying fd. + if (stream === process.stdout || stream === process.stderr) { + ret.on('end', () => stream.end()); + } } else { ret = makeAsyncIterable(ret); From 1498bb357b27980a58cccfe92578355310b50f1e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Apr 2020 20:01:32 +0200 Subject: [PATCH 10/13] fixup --- test/parallel/test-stream-pipeline-process.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index aa4fcd7d06f4e6..d1524299b9abbc 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -16,13 +16,13 @@ if (process.argv[2] === 'child') { const cp = require('child_process'); cp.exec([ 'echo', - '"hello"', + 'hello', '|', `"${process.execPath}"`, `"${__filename}"`, 'child' ].join(' '), common.mustCall((err, stdout) => { assert.ifError(err); - assert.strictEqual(stdout, 'hello\n'); + assert.strictEqual(stdout, 'hello'); })); } From 2feb611a78fd4cb53336e77c6ed4831c3a06f995 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Apr 2020 20:02:37 +0200 Subject: [PATCH 11/13] fixup --- test/parallel/test-stream-pipeline-process.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index d1524299b9abbc..213244a121c85b 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -2,6 +2,7 @@ const common = require('../common'); const assert = require('assert'); +const os = require('os') if (process.argv[2] === 'child') { const { pipeline } = require('stream'); @@ -23,6 +24,6 @@ if (process.argv[2] === 'child') { 'child' ].join(' '), common.mustCall((err, stdout) => { assert.ifError(err); - assert.strictEqual(stdout, 'hello'); + assert.strictEqual(stdout.split(os.EOL).shift(), 'hello'); })); } From e2ef9c94295ceef9363cc0978ba1793fe66480e3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Apr 2020 20:05:24 +0200 Subject: [PATCH 12/13] fixup --- test/parallel/test-stream-pipeline-process.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index 213244a121c85b..2b4d6b612159ed 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -24,6 +24,6 @@ if (process.argv[2] === 'child') { 'child' ].join(' '), common.mustCall((err, stdout) => { assert.ifError(err); - assert.strictEqual(stdout.split(os.EOL).shift(), 'hello'); + assert.strictEqual(stdout.split(os.EOL).shift().trim(), 'hello'); })); } From 51d45ebafd97a9d4c85b55452fffc81fefa56186 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 6 Apr 2020 20:22:01 +0200 Subject: [PATCH 13/13] fixup --- test/parallel/test-stream-pipeline-process.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-pipeline-process.js b/test/parallel/test-stream-pipeline-process.js index 2b4d6b612159ed..825b4454918ddc 100644 --- a/test/parallel/test-stream-pipeline-process.js +++ b/test/parallel/test-stream-pipeline-process.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const os = require('os') +const os = require('os'); if (process.argv[2] === 'child') { const { pipeline } = require('stream');