Skip to content

Commit

Permalink
stream: make all streams error in a pipeline
Browse files Browse the repository at this point in the history
This changes makes all stream in a pipeline emit 'error' in
case of an abnormal termination of the pipeline. If the last stream
is currently being async iterated, this change will make the iteration
reject accordingly.

See: #30861
Fixes: #28194

PR-URL: #30869
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
mcollina committed Dec 14, 2019
1 parent ccdd6ef commit 6480882
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
24 changes: 18 additions & 6 deletions lib/internal/streams/pipeline.js
Expand Up @@ -43,15 +43,21 @@ function destroyer(stream, reading, writing, callback) {

// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (typeof stream.destroy === 'function') return stream.destroy();
if (typeof stream.destroy === 'function') {
if (stream.req && stream._writableState === undefined) {
// This is a ClientRequest
// TODO(mcollina): backward compatible fix to avoid crashing.
// Possibly remove in a later semver-major change.
stream.req.on('error', noop);
}
return stream.destroy(err);

This comment was marked as resolved.

Copy link
@szmarczak

szmarczak Dec 19, 2019

Member

This is a breaking change. If IncomingMessage emits an error, then ClientRequest emits it too.
This isn't the existing behavior in Node.js 10 nor 12.

backward compatible fix to avoid crashing.

Can you please explain what crash are you referring to?

This comment was marked as resolved.

Copy link
@szmarczak

szmarczak Dec 19, 2019

Member

Wait, I actually misunderstood something.

This comment was marked as resolved.

Copy link
@szmarczak

szmarczak Dec 19, 2019

Member

Sorry, I thought that the request error was caused by this. It isn't.

This comment was marked as resolved.

Copy link
@szmarczak

szmarczak Dec 19, 2019

Member

Actually it is. I'll open an issue.

}

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
}

function call(fn) {
fn();
}
function noop() {}

function pipe(from, to) {
return from.pipe(to);
Expand Down Expand Up @@ -81,9 +87,15 @@ function pipeline(...streams) {
const writing = i > 0;
return destroyer(stream, reading, writing, function(err) {
if (!error) error = err;
if (err) destroys.forEach(call);
if (err) {
for (const destroy of destroys) {
destroy(err);
}
}
if (reading) return;
destroys.forEach(call);
for (const destroy of destroys) {
destroy();
}
callback(error);
});
});
Expand Down
31 changes: 31 additions & 0 deletions test/parallel/test-stream-pipeline-async-iterator.js
@@ -0,0 +1,31 @@
'use strict';

const common = require('../common');
const { Readable, PassThrough, pipeline } = require('stream');
const assert = require('assert');

const _err = new Error('kaboom');

async function run() {
const source = new Readable({
read() {
}
});
source.push('hello');
source.push('world');

setImmediate(() => { source.destroy(_err); });

const iterator = pipeline(
source,
new PassThrough(),
() => {});

iterator.setEncoding('utf8');

for await (const k of iterator) {
assert.strictEqual(k, 'helloworld');
}
}

run().catch(common.mustCall((err) => assert.strictEqual(err, _err)));
6 changes: 6 additions & 0 deletions test/parallel/test-stream-pipeline.js
Expand Up @@ -119,6 +119,12 @@ const { promisify } = require('util');
transform.on('close', common.mustCall());
write.on('close', common.mustCall());

[read, transform, write].forEach((stream) => {
stream.on('error', common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
});

const dst = pipeline(read, transform, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
Expand Down

0 comments on commit 6480882

Please sign in to comment.