From 82b60df7e1c5dc02d1fdc90c965a7352ea3b5ff9 Mon Sep 17 00:00:00 2001 From: Marcos Casagrande Date: Sun, 8 Dec 2019 21:47:49 -0300 Subject: [PATCH 1/2] stream: propagate errors from src streams in async iterator Fixes: https://github.com/nodejs/node/issues/28194 --- lib/_stream_readable.js | 21 ++++++++++++++++++++- lib/internal/streams/async_iterator.js | 14 ++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 6226bbf5eb4063..125b800abfbf15 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -109,6 +109,7 @@ function ReadableState(options, stream, isDuplex) { this.buffer = new BufferList(); this.length = 0; this.pipes = []; + this.pipeSources = []; this.flowing = null; this.ended = false; this.endEmitted = false; @@ -698,6 +699,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { } } + if (dest._readableState) + dest._readableState.pipeSources.push(this); + state.pipes.push(dest); debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); @@ -859,6 +863,16 @@ function pipeOnDrain(src, dest) { }; } +function unpipeSources(src, dest) { + const destState = dest._readableState; + if (!destState) + return; + + const pipeSourcesIndex = destState.pipeSources.indexOf(src); + if (pipeSourcesIndex !== -1) + destState.pipeSources.splice(pipeSourcesIndex, 1); +} + Readable.prototype.unpipe = function(dest) { const state = this._readableState; @@ -874,11 +888,14 @@ Readable.prototype.unpipe = function(dest) { state.pipes = []; state.flowing = false; - for (var i = 0; i < dests.length; i++) + for (let i = 0; i < dests.length; i++) { + unpipeSources(this, dests[i]); dests[i].emit('unpipe', this, { hasUnpiped: false }); + } return this; } + // Try to find the right one. const index = state.pipes.indexOf(dest); if (index === -1) @@ -888,6 +905,8 @@ Readable.prototype.unpipe = function(dest) { if (state.pipes.length === 0) state.flowing = false; + unpipeSources(this, dest); + dest.emit('unpipe', this, unpipeInfo); return this; diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index a6b8bd9d181c38..7fef60a5a745b7 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -54,6 +54,18 @@ function wrapForNext(lastPromise, iter) { }; } +function handlePipelineError(sources, current) { + + if (!sources) return; + + for (const stream of sources) { + const listener = (err) => current.emit('error', err); + stream.on('error', listener); + stream.on('unpipe', () => stream.off('error', listener)); + handlePipelineError(stream._readableState.pipeSources, current); + } +} + const AsyncIteratorPrototype = ObjectGetPrototypeOf( ObjectGetPrototypeOf(async function* () {}).prototype); @@ -169,6 +181,8 @@ const createReadableStreamAsyncIterator = (stream) => { }); iterator[kLastPromise] = null; + handlePipelineError(stream._readableState.pipeSources, stream); + finished(stream, { writable: false }, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { const reject = iterator[kLastReject]; From 79977146bc04c576b9affbb1b1edc1933cc07a2a Mon Sep 17 00:00:00 2001 From: Marcos Casagrande Date: Sun, 8 Dec 2019 22:00:54 -0300 Subject: [PATCH 2/2] test: ensure src streams propagate error correctly in async iterator --- .../test-stream-readable-async-iterators.js | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 4a63e9fd3022e6..68fb3fc2d6d5cf 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -270,6 +270,77 @@ async function tests() { assert.strictEqual(received, 1); } + { + console.log('destroyed sync after push in pipe source'); + for (const pipes of [1, 3]) { // Test single pipe & multiple + + const readable = new Readable({ + objectMode: true, + read() { + this.push('hello'); + this.destroy(new Error('kaboom')); + } + }); + let iterator = readable.pipe(new PassThrough()); + + for (let i = 1; i < pipes; i++) + iterator = iterator.pipe(new PassThrough()); + + let received = 0; + + let err = null; + try { + for await (const k of iterator) { + assert.strictEqual(k, 'hello'); + received++; + } + } catch (e) { + err = e; + } + + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(received, 0); + } + } + + { + console.log('destroy async in pipe source'); + for (const pipes of [1, 3]) { // Test single pipe & multiple + const readable = new Readable({ + objectMode: true, + read() { + if (!this.pushed) { + this.push('hello'); + this.pushed = true; + + setImmediate(() => { + this.destroy(new Error('kaboom')); + }); + } + } + }); + let iterator = readable.pipe(new PassThrough()); + + for (let i = 1; i < pipes; i++) + iterator = iterator.pipe(new PassThrough()); + + let received = 0; + + let err = null; + try { + // eslint-disable-next-line no-unused-vars + for await (const k of iterator) { + received++; + } + } catch (e) { + err = e; + } + + assert.strictEqual(err.message, 'kaboom'); + assert.strictEqual(received, 1); + } + } + { console.log('push async'); const max = 42; @@ -362,6 +433,38 @@ async function tests() { ); } + { + console.log('error on pipelined stream'); + const err = new Error('kaboom'); + const readable = new Readable({ + read() { + if (!this.pushed) { + this.push('hello'); + this.pushed = true; + + setImmediate(() => { + this.destroy(err); + }); + } + } + }); + + const passthrough = new PassThrough(); + const iterator = pipeline(readable, passthrough, common.mustCall((e) => { + assert.strictEqual(e, err); + })); + + let receivedErr; + try { + // eslint-disable-next-line no-unused-vars + for await (const k of iterator); + } catch (e) { + receivedErr = e; + } + + assert.strictEqual(receivedErr, err); + } + { console.log('iterating on an ended stream completes'); const r = new Readable({