From 6099382023b2eaa0b63ad1739ea4d7c8b77b74d8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 17:17:20 +0200 Subject: [PATCH 1/8] stream: simplify Transform stream implementation Significantly simplified Transform stream implementation by using mostly standard stream code. --- lib/_stream_transform.js | 158 ++++------- lib/internal/errors.js | 4 - test/parallel/test-stream2-transform.js | 354 ++++++++++++------------ 3 files changed, 225 insertions(+), 291 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 7bfbb04091fd5d..dadfc83224d4d5 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -65,45 +65,18 @@ const { ObjectSetPrototypeOf, + Symbol } = primordials; module.exports = Transform; const { - ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_TRANSFORM_ALREADY_TRANSFORMING, - ERR_TRANSFORM_WITH_LENGTH_0 + ERR_METHOD_NOT_IMPLEMENTED } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); - -function afterTransform(er, data) { - const ts = this._transformState; - ts.transforming = false; - - const cb = ts.writecb; - - if (cb === null) { - return this.emit('error', new ERR_MULTIPLE_CALLBACK()); - } - - ts.writechunk = null; - ts.writecb = null; - - if (data != null) // Single equals check for both `null` and `undefined` - this.push(data); - - cb(er); - - const rs = this._readableState; - rs.reading = false; - if (rs.needReadable || rs.length < rs.highWaterMark) { - this._read(rs.highWaterMark); - } -} - +const kCallback = Symbol('kCallback'); function Transform(options) { if (!(this instanceof Transform)) @@ -111,20 +84,13 @@ function Transform(options) { Duplex.call(this, options); - this._transformState = { - afterTransform: afterTransform.bind(this), - needTransform: false, - transforming: false, - writecb: null, - writechunk: null, - writeencoding: null - }; - // We have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. this._readableState.sync = false; + this[kCallback] = null; + if (options) { if (typeof options.transform === 'function') this._transform = options.transform; @@ -133,89 +99,65 @@ function Transform(options) { this._flush = options.flush; } - // When the writable side finishes, then flush out anything remaining. + // TODO(ronag): Unfortunately _final is invoked asynchronously. + // Use `prefinish` hack. `prefinish` is emitted synchronously when + // and only when `_final` is not defined. Implementing `_final` + // to a Transform should be an error. this.on('prefinish', prefinish); } function prefinish() { - if (typeof this._flush === 'function' && !this._readableState.destroyed) { + if (typeof this._flush === 'function' && !this.destroyed) { this._flush((er, data) => { - done(this, er, data); + if (er) { + this.destroy(er); + return; + } + + if (data != null) { + this.push(data); + } + this.push(null); }); } else { - done(this, null, null); + this.push(null); } } -Transform.prototype.push = function(chunk, encoding) { - this._transformState.needTransform = false; - return Duplex.prototype.push.call(this, chunk, encoding); -}; - -// This is the part where you do stuff! -// override this function in implementation classes. -// 'chunk' is an input chunk. -// -// Call `push(newChunk)` to pass along transformed output -// to the readable side. You may call 'push' zero or more times. -// -// Call `cb(err)` when you are done with this chunk. If you pass -// an error, then that'll put the hurt on the whole operation. If you -// never call cb(), then you'll never get another chunk. -Transform.prototype._transform = function(chunk, encoding, cb) { +Transform.prototype._transform = function(chunk, encoding, callback) { throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); }; -Transform.prototype._write = function(chunk, encoding, cb) { - const ts = this._transformState; - ts.writecb = cb; - ts.writechunk = chunk; - ts.writeencoding = encoding; - if (!ts.transforming) { - const rs = this._readableState; - if (ts.needTransform || - rs.needReadable || - rs.length < rs.highWaterMark) - this._read(rs.highWaterMark); - } +Transform.prototype._write = function(chunk, encoding, callback) { + const rState = this._readableState; + const length = rState.length; + + this._transform(chunk, encoding, (err, val) => { + if (err) { + callback(err); + return; + } + + if (val != null) { + this.push(val); + } + + if ( + length === rState.length || + rState.length < rState.highWaterMark || + rState.length === 0 + ) { + callback(); + } else { + this[kCallback] = callback; + } + }); }; -// Doesn't matter what the args are here. -// _transform does all the work. -// That we got here means that the readable side wants more data. -Transform.prototype._read = function(n) { - const ts = this._transformState; - - if (ts.writechunk !== null && !ts.transforming) { - ts.transforming = true; - this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); - } else { - // Mark that we need a transform, so that any data that comes in - // will get processed, now that we've asked for it. - ts.needTransform = true; +Transform.prototype._read = function() { + if (this[kCallback]) { + const callback = this[kCallback]; + this[kCallback] = null; + callback(); } }; - - -Transform.prototype._destroy = function(err, cb) { - Duplex.prototype._destroy.call(this, err, (err2) => { - cb(err2); - }); -}; - - -function done(stream, er, data) { - if (er) - return stream.emit('error', er); - - if (data != null) // Single equals check for both `null` and `undefined` - stream.push(data); - - // These two error cases are coherence checks that can likely not be tested. - if (stream._writableState.length) - throw new ERR_TRANSFORM_WITH_LENGTH_0(); - - if (stream._transformState.transforming) - throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); - return stream.push(null); -} diff --git a/lib/internal/errors.js b/lib/internal/errors.js index e0668e2f827e5f..4c583981725b3b 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER', E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED', 'At least one category is required', TypeError); E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error); -E('ERR_TRANSFORM_ALREADY_TRANSFORMING', - 'Calling transform done when still transforming', Error); // This should probably be a `RangeError`. -E('ERR_TRANSFORM_WITH_LENGTH_0', - 'Calling transform done when writableState.length != 0', Error); E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError); E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET', '`process.setupUncaughtExceptionCapture()` was called while a capture ' + diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index fa24ce152d71ed..03a63b478029f5 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -45,10 +45,6 @@ const Transform = require('_stream_transform'); assert.strictEqual(tx.readableLength, 10); assert.strictEqual(transformed, 10); - assert.strictEqual(tx._transformState.writechunk.length, 5); - assert.deepStrictEqual(tx.writableBuffer.map(function(c) { - return c.chunk.length; - }), [6, 7, 8, 9, 10]); } { @@ -295,178 +291,178 @@ const Transform = require('_stream_transform'); } -{ - // Verify passthrough event emission - const pt = new PassThrough(); - let emits = 0; - pt.on('readable', function() { - emits++; - }); - - pt.write(Buffer.from('foog')); - pt.write(Buffer.from('bark')); - - assert.strictEqual(emits, 0); - assert.strictEqual(pt.read(5).toString(), 'foogb'); - assert.strictEqual(String(pt.read(5)), 'null'); - assert.strictEqual(emits, 0); - - pt.write(Buffer.from('bazy')); - pt.write(Buffer.from('kuel')); - - assert.strictEqual(emits, 0); - assert.strictEqual(pt.read(5).toString(), 'arkba'); - assert.strictEqual(pt.read(5).toString(), 'zykue'); - assert.strictEqual(pt.read(5), null); - - pt.end(); - - assert.strictEqual(emits, 1); - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); - assert.strictEqual(emits, 1); -} - -{ - // Verify passthrough event emission reordering - const pt = new PassThrough(); - let emits = 0; - pt.on('readable', function() { - emits++; - }); - - pt.write(Buffer.from('foog')); - pt.write(Buffer.from('bark')); - - assert.strictEqual(emits, 0); - assert.strictEqual(pt.read(5).toString(), 'foogb'); - assert.strictEqual(pt.read(5), null); - - pt.once('readable', common.mustCall(function() { - assert.strictEqual(pt.read(5).toString(), 'arkba'); - assert.strictEqual(pt.read(5), null); - - pt.once('readable', common.mustCall(function() { - assert.strictEqual(pt.read(5).toString(), 'zykue'); - assert.strictEqual(pt.read(5), null); - pt.once('readable', common.mustCall(function() { - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); - assert.strictEqual(emits, 3); - })); - pt.end(); - })); - pt.write(Buffer.from('kuel')); - })); - - pt.write(Buffer.from('bazy')); -} - -{ - // Verify passthrough facade - const pt = new PassThrough(); - const datas = []; - pt.on('data', function(chunk) { - datas.push(chunk.toString()); - }); - - pt.on('end', common.mustCall(function() { - assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']); - })); - - pt.write(Buffer.from('foog')); - setTimeout(function() { - pt.write(Buffer.from('bark')); - setTimeout(function() { - pt.write(Buffer.from('bazy')); - setTimeout(function() { - pt.write(Buffer.from('kuel')); - setTimeout(function() { - pt.end(); - }, 10); - }, 10); - }, 10); - }, 10); -} - -{ - // Verify object transform (JSON parse) - const jp = new Transform({ objectMode: true }); - jp._transform = function(data, encoding, cb) { - try { - jp.push(JSON.parse(data)); - cb(); - } catch (er) { - cb(er); - } - }; - - // Anything except null/undefined is fine. - // those are "magic" in the stream API, because they signal EOF. - const objects = [ - { foo: 'bar' }, - 100, - 'string', - { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } - ]; - - let ended = false; - jp.on('end', function() { - ended = true; - }); - - objects.forEach(function(obj) { - jp.write(JSON.stringify(obj)); - const res = jp.read(); - assert.deepStrictEqual(res, obj); - }); - - jp.end(); - // Read one more time to get the 'end' event - jp.read(); - - process.nextTick(common.mustCall(function() { - assert.strictEqual(ended, true); - })); -} - -{ - // Verify object transform (JSON stringify) - const js = new Transform({ objectMode: true }); - js._transform = function(data, encoding, cb) { - try { - js.push(JSON.stringify(data)); - cb(); - } catch (er) { - cb(er); - } - }; - - // Anything except null/undefined is fine. - // those are "magic" in the stream API, because they signal EOF. - const objects = [ - { foo: 'bar' }, - 100, - 'string', - { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } - ]; - - let ended = false; - js.on('end', function() { - ended = true; - }); - - objects.forEach(function(obj) { - js.write(obj); - const res = js.read(); - assert.strictEqual(res, JSON.stringify(obj)); - }); - - js.end(); - // Read one more time to get the 'end' event - js.read(); - - process.nextTick(common.mustCall(function() { - assert.strictEqual(ended, true); - })); -} +// { +// // Verify passthrough event emission +// const pt = new PassThrough(); +// let emits = 0; +// pt.on('readable', function() { +// emits++; +// }); + +// pt.write(Buffer.from('foog')); +// pt.write(Buffer.from('bark')); + +// assert.strictEqual(emits, 0); +// assert.strictEqual(pt.read(5).toString(), 'foogb'); +// assert.strictEqual(String(pt.read(5)), 'null'); +// assert.strictEqual(emits, 0); + +// pt.write(Buffer.from('bazy')); +// pt.write(Buffer.from('kuel')); + +// assert.strictEqual(emits, 0); +// assert.strictEqual(pt.read(5).toString(), 'arkba'); +// assert.strictEqual(pt.read(5).toString(), 'zykue'); +// assert.strictEqual(pt.read(5), null); + +// pt.end(); + +// assert.strictEqual(emits, 1); +// assert.strictEqual(pt.read(5).toString(), 'l'); +// assert.strictEqual(pt.read(5), null); +// assert.strictEqual(emits, 1); +// } + +// { +// // Verify passthrough event emission reordering +// const pt = new PassThrough(); +// let emits = 0; +// pt.on('readable', function() { +// emits++; +// }); + +// pt.write(Buffer.from('foog')); +// pt.write(Buffer.from('bark')); + +// assert.strictEqual(emits, 0); +// assert.strictEqual(pt.read(5).toString(), 'foogb'); +// assert.strictEqual(pt.read(5), null); + +// pt.once('readable', common.mustCall(function() { +// assert.strictEqual(pt.read(5).toString(), 'arkba'); +// assert.strictEqual(pt.read(5), null); + +// pt.once('readable', common.mustCall(function() { +// assert.strictEqual(pt.read(5).toString(), 'zykue'); +// assert.strictEqual(pt.read(5), null); +// pt.once('readable', common.mustCall(function() { +// assert.strictEqual(pt.read(5).toString(), 'l'); +// assert.strictEqual(pt.read(5), null); +// assert.strictEqual(emits, 3); +// })); +// pt.end(); +// })); +// pt.write(Buffer.from('kuel')); +// })); + +// pt.write(Buffer.from('bazy')); +// } + +// { +// // Verify passthrough facade +// const pt = new PassThrough(); +// const datas = []; +// pt.on('data', function(chunk) { +// datas.push(chunk.toString()); +// }); + +// pt.on('end', common.mustCall(function() { +// assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']); +// })); + +// pt.write(Buffer.from('foog')); +// setTimeout(function() { +// pt.write(Buffer.from('bark')); +// setTimeout(function() { +// pt.write(Buffer.from('bazy')); +// setTimeout(function() { +// pt.write(Buffer.from('kuel')); +// setTimeout(function() { +// pt.end(); +// }, 10); +// }, 10); +// }, 10); +// }, 10); +// } + +// { +// // Verify object transform (JSON parse) +// const jp = new Transform({ objectMode: true }); +// jp._transform = function(data, encoding, cb) { +// try { +// jp.push(JSON.parse(data)); +// cb(); +// } catch (er) { +// cb(er); +// } +// }; + +// // Anything except null/undefined is fine. +// // those are "magic" in the stream API, because they signal EOF. +// const objects = [ +// { foo: 'bar' }, +// 100, +// 'string', +// { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } +// ]; + +// let ended = false; +// jp.on('end', function() { +// ended = true; +// }); + +// objects.forEach(function(obj) { +// jp.write(JSON.stringify(obj)); +// const res = jp.read(); +// assert.deepStrictEqual(res, obj); +// }); + +// jp.end(); +// // Read one more time to get the 'end' event +// jp.read(); + +// process.nextTick(common.mustCall(function() { +// assert.strictEqual(ended, true); +// })); +// } + +// { +// // Verify object transform (JSON stringify) +// const js = new Transform({ objectMode: true }); +// js._transform = function(data, encoding, cb) { +// try { +// js.push(JSON.stringify(data)); +// cb(); +// } catch (er) { +// cb(er); +// } +// }; + +// // Anything except null/undefined is fine. +// // those are "magic" in the stream API, because they signal EOF. +// const objects = [ +// { foo: 'bar' }, +// 100, +// 'string', +// { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } +// ]; + +// let ended = false; +// js.on('end', function() { +// ended = true; +// }); + +// objects.forEach(function(obj) { +// js.write(obj); +// const res = js.read(); +// assert.strictEqual(res, JSON.stringify(obj)); +// }); + +// js.end(); +// // Read one more time to get the 'end' event +// js.read(); + +// process.nextTick(common.mustCall(function() { +// assert.strictEqual(ended, true); +// })); +// } From 8a54b1ff100da13788517893acc5abf9954cc9af Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 18:08:10 +0200 Subject: [PATCH 2/8] fixup: zlib --- test/parallel/test-zlib-flush-drain.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/test/parallel/test-zlib-flush-drain.js b/test/parallel/test-zlib-flush-drain.js index ac89e990c3fcda..4c33d8520e90bf 100644 --- a/test/parallel/test-zlib-flush-drain.js +++ b/test/parallel/test-zlib-flush-drain.js @@ -1,5 +1,5 @@ 'use strict'; -require('../common'); +const common = require('../common'); const assert = require('assert'); const zlib = require('zlib'); @@ -28,13 +28,16 @@ const ws = deflater._writableState; const beforeFlush = ws.needDrain; let afterFlush = ws.needDrain; -deflater.flush(function(err) { - afterFlush = ws.needDrain; +deflater.on('data', () => { }); -deflater.on('drain', function() { +deflater.flush(common.mustCall(function(err) { + afterFlush = ws.needDrain; +})); + +deflater.on('drain', common.mustCall(function() { drainCount++; -}); +})); process.once('exit', function() { assert.strictEqual( From 8cf4a58389c2f0fe9a281d7a0ff04f61d9c5513c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 18:13:16 +0200 Subject: [PATCH 3/8] fixup: zlib --- lib/_stream_transform.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index dadfc83224d4d5..9c83a3c3ea6810 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -130,6 +130,7 @@ Transform.prototype._transform = function(chunk, encoding, callback) { Transform.prototype._write = function(chunk, encoding, callback) { const rState = this._readableState; + const wState = this._writableState; const length = rState.length; this._transform(chunk, encoding, (err, val) => { @@ -143,6 +144,7 @@ Transform.prototype._write = function(chunk, encoding, callback) { } if ( + wState.ended || length === rState.length || rState.length < rState.highWaterMark || rState.length === 0 From 2f5da16a96495fb754286d797f8902bee5bca7d8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 18:14:22 +0200 Subject: [PATCH 4/8] fixup: test --- test/parallel/test-stream2-transform.js | 350 ++++++++++++------------ 1 file changed, 175 insertions(+), 175 deletions(-) diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 03a63b478029f5..945ed30a191c64 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -291,178 +291,178 @@ const Transform = require('_stream_transform'); } -// { -// // Verify passthrough event emission -// const pt = new PassThrough(); -// let emits = 0; -// pt.on('readable', function() { -// emits++; -// }); - -// pt.write(Buffer.from('foog')); -// pt.write(Buffer.from('bark')); - -// assert.strictEqual(emits, 0); -// assert.strictEqual(pt.read(5).toString(), 'foogb'); -// assert.strictEqual(String(pt.read(5)), 'null'); -// assert.strictEqual(emits, 0); - -// pt.write(Buffer.from('bazy')); -// pt.write(Buffer.from('kuel')); - -// assert.strictEqual(emits, 0); -// assert.strictEqual(pt.read(5).toString(), 'arkba'); -// assert.strictEqual(pt.read(5).toString(), 'zykue'); -// assert.strictEqual(pt.read(5), null); - -// pt.end(); - -// assert.strictEqual(emits, 1); -// assert.strictEqual(pt.read(5).toString(), 'l'); -// assert.strictEqual(pt.read(5), null); -// assert.strictEqual(emits, 1); -// } - -// { -// // Verify passthrough event emission reordering -// const pt = new PassThrough(); -// let emits = 0; -// pt.on('readable', function() { -// emits++; -// }); - -// pt.write(Buffer.from('foog')); -// pt.write(Buffer.from('bark')); - -// assert.strictEqual(emits, 0); -// assert.strictEqual(pt.read(5).toString(), 'foogb'); -// assert.strictEqual(pt.read(5), null); - -// pt.once('readable', common.mustCall(function() { -// assert.strictEqual(pt.read(5).toString(), 'arkba'); -// assert.strictEqual(pt.read(5), null); - -// pt.once('readable', common.mustCall(function() { -// assert.strictEqual(pt.read(5).toString(), 'zykue'); -// assert.strictEqual(pt.read(5), null); -// pt.once('readable', common.mustCall(function() { -// assert.strictEqual(pt.read(5).toString(), 'l'); -// assert.strictEqual(pt.read(5), null); -// assert.strictEqual(emits, 3); -// })); -// pt.end(); -// })); -// pt.write(Buffer.from('kuel')); -// })); - -// pt.write(Buffer.from('bazy')); -// } - -// { -// // Verify passthrough facade -// const pt = new PassThrough(); -// const datas = []; -// pt.on('data', function(chunk) { -// datas.push(chunk.toString()); -// }); - -// pt.on('end', common.mustCall(function() { -// assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']); -// })); - -// pt.write(Buffer.from('foog')); -// setTimeout(function() { -// pt.write(Buffer.from('bark')); -// setTimeout(function() { -// pt.write(Buffer.from('bazy')); -// setTimeout(function() { -// pt.write(Buffer.from('kuel')); -// setTimeout(function() { -// pt.end(); -// }, 10); -// }, 10); -// }, 10); -// }, 10); -// } - -// { -// // Verify object transform (JSON parse) -// const jp = new Transform({ objectMode: true }); -// jp._transform = function(data, encoding, cb) { -// try { -// jp.push(JSON.parse(data)); -// cb(); -// } catch (er) { -// cb(er); -// } -// }; - -// // Anything except null/undefined is fine. -// // those are "magic" in the stream API, because they signal EOF. -// const objects = [ -// { foo: 'bar' }, -// 100, -// 'string', -// { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } -// ]; - -// let ended = false; -// jp.on('end', function() { -// ended = true; -// }); - -// objects.forEach(function(obj) { -// jp.write(JSON.stringify(obj)); -// const res = jp.read(); -// assert.deepStrictEqual(res, obj); -// }); - -// jp.end(); -// // Read one more time to get the 'end' event -// jp.read(); - -// process.nextTick(common.mustCall(function() { -// assert.strictEqual(ended, true); -// })); -// } - -// { -// // Verify object transform (JSON stringify) -// const js = new Transform({ objectMode: true }); -// js._transform = function(data, encoding, cb) { -// try { -// js.push(JSON.stringify(data)); -// cb(); -// } catch (er) { -// cb(er); -// } -// }; - -// // Anything except null/undefined is fine. -// // those are "magic" in the stream API, because they signal EOF. -// const objects = [ -// { foo: 'bar' }, -// 100, -// 'string', -// { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } -// ]; - -// let ended = false; -// js.on('end', function() { -// ended = true; -// }); - -// objects.forEach(function(obj) { -// js.write(obj); -// const res = js.read(); -// assert.strictEqual(res, JSON.stringify(obj)); -// }); - -// js.end(); -// // Read one more time to get the 'end' event -// js.read(); - -// process.nextTick(common.mustCall(function() { -// assert.strictEqual(ended, true); -// })); -// } +{ + // Verify passthrough event emission + const pt = new PassThrough(); + let emits = 0; + pt.on('readable', function() { + emits++; + }); + + pt.write(Buffer.from('foog')); + pt.write(Buffer.from('bark')); + + assert.strictEqual(emits, 0); + assert.strictEqual(pt.read(5).toString(), 'foogb'); + assert.strictEqual(String(pt.read(5)), 'null'); + assert.strictEqual(emits, 0); + + pt.write(Buffer.from('bazy')); + pt.write(Buffer.from('kuel')); + + assert.strictEqual(emits, 0); + assert.strictEqual(pt.read(5).toString(), 'arkba'); + assert.strictEqual(pt.read(5).toString(), 'zykue'); + assert.strictEqual(pt.read(5), null); + + pt.end(); + + assert.strictEqual(emits, 1); + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); + assert.strictEqual(emits, 1); +} + +{ + // Verify passthrough event emission reordering + const pt = new PassThrough(); + let emits = 0; + pt.on('readable', function() { + emits++; + }); + + pt.write(Buffer.from('foog')); + pt.write(Buffer.from('bark')); + + assert.strictEqual(emits, 0); + assert.strictEqual(pt.read(5).toString(), 'foogb'); + assert.strictEqual(pt.read(5), null); + + pt.once('readable', common.mustCall(function() { + assert.strictEqual(pt.read(5).toString(), 'arkba'); + assert.strictEqual(pt.read(5), null); + + pt.once('readable', common.mustCall(function() { + assert.strictEqual(pt.read(5).toString(), 'zykue'); + assert.strictEqual(pt.read(5), null); + pt.once('readable', common.mustCall(function() { + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); + assert.strictEqual(emits, 3); + })); + pt.end(); + })); + pt.write(Buffer.from('kuel')); + })); + + pt.write(Buffer.from('bazy')); +} + +{ + // Verify passthrough facade + const pt = new PassThrough(); + const datas = []; + pt.on('data', function(chunk) { + datas.push(chunk.toString()); + }); + + pt.on('end', common.mustCall(function() { + assert.deepStrictEqual(datas, ['foog', 'bark', 'bazy', 'kuel']); + })); + + pt.write(Buffer.from('foog')); + setTimeout(function() { + pt.write(Buffer.from('bark')); + setTimeout(function() { + pt.write(Buffer.from('bazy')); + setTimeout(function() { + pt.write(Buffer.from('kuel')); + setTimeout(function() { + pt.end(); + }, 10); + }, 10); + }, 10); + }, 10); +} + +{ + // Verify object transform (JSON parse) + const jp = new Transform({ objectMode: true }); + jp._transform = function(data, encoding, cb) { + try { + jp.push(JSON.parse(data)); + cb(); + } catch (er) { + cb(er); + } + }; + + // Anything except null/undefined is fine. + // those are "magic" in the stream API, because they signal EOF. + const objects = [ + { foo: 'bar' }, + 100, + 'string', + { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } + ]; + + let ended = false; + jp.on('end', function() { + ended = true; + }); + + objects.forEach(function(obj) { + jp.write(JSON.stringify(obj)); + const res = jp.read(); + assert.deepStrictEqual(res, obj); + }); + + jp.end(); + // Read one more time to get the 'end' event + jp.read(); + + process.nextTick(common.mustCall(function() { + assert.strictEqual(ended, true); + })); +} + +{ + // Verify object transform (JSON stringify) + const js = new Transform({ objectMode: true }); + js._transform = function(data, encoding, cb) { + try { + js.push(JSON.stringify(data)); + cb(); + } catch (er) { + cb(er); + } + }; + + // Anything except null/undefined is fine. + // those are "magic" in the stream API, because they signal EOF. + const objects = [ + { foo: 'bar' }, + 100, + 'string', + { nested: { things: [ { foo: 'bar' }, 100, 'string' ] } } + ]; + + let ended = false; + js.on('end', function() { + ended = true; + }); + + objects.forEach(function(obj) { + js.write(obj); + const res = js.read(); + assert.strictEqual(res, JSON.stringify(obj)); + }); + + js.end(); + // Read one more time to get the 'end' event + js.read(); + + process.nextTick(common.mustCall(function() { + assert.strictEqual(ended, true); + })); +} From e12e6b4fa9188a9b868606d9ece0d24edd3bab5d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 18:15:19 +0200 Subject: [PATCH 5/8] fixup: cleanup --- test/parallel/test-zlib-flush-drain.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/parallel/test-zlib-flush-drain.js b/test/parallel/test-zlib-flush-drain.js index 4c33d8520e90bf..86ab0dbf82a642 100644 --- a/test/parallel/test-zlib-flush-drain.js +++ b/test/parallel/test-zlib-flush-drain.js @@ -31,13 +31,13 @@ let afterFlush = ws.needDrain; deflater.on('data', () => { }); -deflater.flush(common.mustCall(function(err) { +deflater.flush(function(err) { afterFlush = ws.needDrain; -})); +}); -deflater.on('drain', common.mustCall(function() { +deflater.on('drain', function() { drainCount++; -})); +}); process.once('exit', function() { assert.strictEqual( From 5cd52b7cee1e1800bcf2eb62a556c44e6a184f28 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 18:47:35 +0200 Subject: [PATCH 6/8] fixup: lint --- test/parallel/test-zlib-flush-drain.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-zlib-flush-drain.js b/test/parallel/test-zlib-flush-drain.js index 86ab0dbf82a642..6993d2c9fe6594 100644 --- a/test/parallel/test-zlib-flush-drain.js +++ b/test/parallel/test-zlib-flush-drain.js @@ -1,5 +1,5 @@ 'use strict'; -const common = require('../common'); +require('../common'); const assert = require('assert'); const zlib = require('zlib'); From dce04fbe48ca4d91244d7aa90a79916e81b2a9a1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Apr 2020 19:34:23 +0200 Subject: [PATCH 7/8] fixup --- test/parallel/test-stream2-transform.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 945ed30a191c64..ad9e1a2237bd72 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -45,6 +45,9 @@ const Transform = require('_stream_transform'); assert.strictEqual(tx.readableLength, 10); assert.strictEqual(transformed, 10); + assert.deepStrictEqual(tx.writableBuffer.map(function(c) { + return c.chunk.length; + }), [5, 6, 7, 8, 9, 10]); } { From 6d596175d7a13365f75711a0c7fcbe65576eb37e Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 11 Apr 2020 11:04:55 +0200 Subject: [PATCH 8/8] fixup: backwards compat notes --- lib/_stream_transform.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 9c83a3c3ea6810..da38bb9a6f3dff 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -144,8 +144,8 @@ Transform.prototype._write = function(chunk, encoding, callback) { } if ( - wState.ended || - length === rState.length || + wState.ended || // Backwards compat. + length === rState.length || // Backwards compat. rState.length < rState.highWaterMark || rState.length === 0 ) {