From ea87809bb6696e2c1ec5d470031f137e18641183 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Apr 2020 13:16:46 +0200 Subject: [PATCH] stream: fix _final and 'prefinish' timing This PR fixes a few different things: The timing of 'prefinish' depends on whether or not _final is defined. In on case the event is emitted synchronously with end() and otherwise asynchronously. _final is currently unecessarily called asynchronously which forces implementors to use 'prefinish' as a hack to emulate synchronous behaviour. Furthermore, this hack is subtly broken due to the above issue. Refs: https://github.com/nodejs/node/issues/31401 Refs: https://github.com/nodejs/node/pull/32763#discussion_r407041983 PR-URL: https://github.com/nodejs/node/pull/32780 Reviewed-By: Matteo Collina Reviewed-By: Rich Trott --- lib/_stream_transform.js | 8 +-- lib/_stream_writable.js | 24 +++++--- lib/internal/http2/core.js | 9 ++- .../test-stream-transform-final-sync.js | 6 +- test/parallel/test-stream-transform-final.js | 6 +- .../parallel/test-stream-writable-finished.js | 58 ++++++++++++++++++- 6 files changed, 88 insertions(+), 23 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index da38bb9a6f3dff..a27b09887b9e3e 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -99,10 +99,10 @@ function Transform(options) { this._flush = options.flush; } - // TODO(ronag): Unfortunately _final is invoked asynchronously. - // Use `prefinish` hack. `prefinish` is emitted synchronously when - // and only when `_final` is not defined. Implementing `_final` - // to a Transform should be an error. + // When the writable side finishes, then flush out anything remaining. + // Backwards compat. Some Transform streams incorrectly implement _final + // instead of or in addition to _flush. By using 'prefinish' instead of + // implementing _final we continue supporting this unfortunate use case. this.on('prefinish', prefinish); } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index eb88a36185a6ff..eff82c8db93d18 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -635,24 +635,30 @@ function needFinish(state) { } function callFinal(stream, state) { + state.sync = true; + state.pendingcb++; stream._final((err) => { state.pendingcb--; if (err) { - errorOrDestroy(stream, err); - } else { + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { state.prefinished = true; stream.emit('prefinish'); - finishMaybe(stream, state); + // Backwards compat. Don't check state.sync here. + // Some streams assume 'finish' will be emitted + // asynchronously relative to _final callback. + state.pendingcb++; + process.nextTick(finish, stream, state); } }); + state.sync = false; } function prefinish(stream, state) { if (!state.prefinished && !state.finalCalled) { if (typeof stream._final === 'function' && !state.destroyed) { - state.pendingcb++; state.finalCalled = true; - process.nextTick(callFinal, stream, state); + callFinal(stream, state); } else { state.prefinished = true; stream.emit('prefinish'); @@ -661,10 +667,9 @@ function prefinish(stream, state) { } function finishMaybe(stream, state, sync) { - const need = needFinish(state); - if (need) { + if (needFinish(state)) { prefinish(stream, state); - if (state.pendingcb === 0) { + if (state.pendingcb === 0 && needFinish(state)) { state.pendingcb++; if (sync) { process.nextTick(finish, stream, state); @@ -673,7 +678,6 @@ function finishMaybe(stream, state, sync) { } } } - return need; } function finish(stream, state) { @@ -681,6 +685,8 @@ function finish(stream, state) { if (state.errorEmitted) return; + // TODO(ronag): This could occur after 'close' is emitted. + state.finished = true; stream.emit('finish'); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 9407c9c9a1d3cf..dc036a316291f4 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1710,11 +1710,14 @@ function streamOnPause() { } function afterShutdown(status) { + const stream = this.handle[kOwner]; + if (stream) { + stream.on('finish', () => { + stream[kMaybeDestroy](); + }); + } // Currently this status value is unused this.callback(); - const stream = this.handle[kOwner]; - if (stream) - stream[kMaybeDestroy](); } function finishSendTrailers(stream, headersList) { diff --git a/test/parallel/test-stream-transform-final-sync.js b/test/parallel/test-stream-transform-final-sync.js index 1942bee1a01e8a..4faf1b067627ad 100644 --- a/test/parallel/test-stream-transform-final-sync.js +++ b/test/parallel/test-stream-transform-final-sync.js @@ -82,7 +82,7 @@ const t = new stream.Transform({ process.nextTick(function() { state++; // fluchCallback part 2 - assert.strictEqual(state, 15); + assert.strictEqual(state, 13); done(); }); }, 1) @@ -90,7 +90,7 @@ const t = new stream.Transform({ t.on('finish', common.mustCall(function() { state++; // finishListener - assert.strictEqual(state, 13); + assert.strictEqual(state, 14); }, 1)); t.on('end', common.mustCall(function() { state++; @@ -106,5 +106,5 @@ t.write(4); t.end(7, common.mustCall(function() { state++; // endMethodCallback - assert.strictEqual(state, 14); + assert.strictEqual(state, 15); }, 1)); diff --git a/test/parallel/test-stream-transform-final.js b/test/parallel/test-stream-transform-final.js index 53b81cfea224e4..19af744a6bb33e 100644 --- a/test/parallel/test-stream-transform-final.js +++ b/test/parallel/test-stream-transform-final.js @@ -84,7 +84,7 @@ const t = new stream.Transform({ process.nextTick(function() { state++; // flushCallback part 2 - assert.strictEqual(state, 15); + assert.strictEqual(state, 13); done(); }); }, 1) @@ -92,7 +92,7 @@ const t = new stream.Transform({ t.on('finish', common.mustCall(function() { state++; // finishListener - assert.strictEqual(state, 13); + assert.strictEqual(state, 14); }, 1)); t.on('end', common.mustCall(function() { state++; @@ -108,5 +108,5 @@ t.write(4); t.end(7, common.mustCall(function() { state++; // endMethodCallback - assert.strictEqual(state, 14); + assert.strictEqual(state, 15); }, 1)); diff --git a/test/parallel/test-stream-writable-finished.js b/test/parallel/test-stream-writable-finished.js index dfe87a9005db8c..862587784a4430 100644 --- a/test/parallel/test-stream-writable-finished.js +++ b/test/parallel/test-stream-writable-finished.js @@ -30,7 +30,7 @@ const assert = require('assert'); } { - // Emit finish asynchronously + // Emit finish asynchronously. const w = new Writable({ write(chunk, encoding, cb) { @@ -41,3 +41,59 @@ const assert = require('assert'); w.end(); w.on('finish', common.mustCall()); } + +{ + // Emit prefinish synchronously. + + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + } + }); + + let sync = true; + w.on('prefinish', common.mustCall(() => { + assert.strictEqual(sync, true); + })); + w.end(); + sync = false; +} + +{ + // Emit prefinish synchronously w/ final. + + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + }, + final(cb) { + cb(); + } + }); + + let sync = true; + w.on('prefinish', common.mustCall(() => { + assert.strictEqual(sync, true); + })); + w.end(); + sync = false; +} + + +{ + // Call _final synchronously. + + let sync = true; + const w = new Writable({ + write(chunk, encoding, cb) { + cb(); + }, + final: common.mustCall((cb) => { + assert.strictEqual(sync, true); + cb(); + }) + }); + + w.end(); + sync = false; +}