diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a27b09887b9e3e..564cdf0e82a986 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -106,11 +106,15 @@ function Transform(options) { this.on('prefinish', prefinish); } -function prefinish() { +function final(cb) { if (typeof this._flush === 'function' && !this.destroyed) { this._flush((er, data) => { if (er) { - this.destroy(er); + if (cb) { + cb(er); + } else { + this.destroy(er); + } return; } @@ -118,12 +122,26 @@ function prefinish() { this.push(data); } this.push(null); + if (cb) { + cb(); + } }); } else { this.push(null); + if (cb) { + cb(); + } + } +} + +function prefinish() { + if (this._final !== final) { + final.call(this); } } +Transform.prototype._final = final; + Transform.prototype._transform = function(chunk, encoding, callback) { throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); }; diff --git a/lib/zlib.js b/lib/zlib.js index 7cc8e2e6275041..fc8e378f41feb5 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -323,6 +323,11 @@ ZlibBase.prototype._flush = function(callback) { this._transform(Buffer.alloc(0), '', callback); }; +// Force Transform compat behavior. +ZlibBase.prototype._final = function(callback) { + callback(); +}; + // If a flush is scheduled while another flush is still pending, a way to figure // out which one is the "stronger" flush is needed. // This is currently only used to figure out which flush flag to use for the diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 5077aaa11a4e6b..5d3f34f51e85be 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1231,3 +1231,27 @@ const net = require('net'); assert.strictEqual(res, 'helloworld'); })); } + +{ + let flushed = false; + const makeStream = () => + new Transform({ + transform: (chunk, enc, cb) => cb(null, chunk), + flush: (cb) => + setTimeout(() => { + flushed = true; + cb(null); + }, 1), + }); + + const input = new Readable(); + input.push(null); + + pipeline( + input, + makeStream(), + common.mustCall(() => { + assert.strictEqual(flushed, true); + }), + ); +}