diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 7bfbb04091fd5d..da38bb9a6f3dff 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -65,45 +65,18 @@ const { ObjectSetPrototypeOf, + Symbol } = primordials; module.exports = Transform; const { - ERR_METHOD_NOT_IMPLEMENTED, - ERR_MULTIPLE_CALLBACK, - ERR_TRANSFORM_ALREADY_TRANSFORMING, - ERR_TRANSFORM_WITH_LENGTH_0 + ERR_METHOD_NOT_IMPLEMENTED } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); - -function afterTransform(er, data) { - const ts = this._transformState; - ts.transforming = false; - - const cb = ts.writecb; - - if (cb === null) { - return this.emit('error', new ERR_MULTIPLE_CALLBACK()); - } - - ts.writechunk = null; - ts.writecb = null; - - if (data != null) // Single equals check for both `null` and `undefined` - this.push(data); - - cb(er); - - const rs = this._readableState; - rs.reading = false; - if (rs.needReadable || rs.length < rs.highWaterMark) { - this._read(rs.highWaterMark); - } -} - +const kCallback = Symbol('kCallback'); function Transform(options) { if (!(this instanceof Transform)) @@ -111,20 +84,13 @@ function Transform(options) { Duplex.call(this, options); - this._transformState = { - afterTransform: afterTransform.bind(this), - needTransform: false, - transforming: false, - writecb: null, - writechunk: null, - writeencoding: null - }; - // We have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. this._readableState.sync = false; + this[kCallback] = null; + if (options) { if (typeof options.transform === 'function') this._transform = options.transform; @@ -133,89 +99,67 @@ function Transform(options) { this._flush = options.flush; } - // When the writable side finishes, then flush out anything remaining. + // TODO(ronag): Unfortunately _final is invoked asynchronously. + // Use `prefinish` hack. `prefinish` is emitted synchronously when + // and only when `_final` is not defined. Implementing `_final` + // to a Transform should be an error. this.on('prefinish', prefinish); } function prefinish() { - if (typeof this._flush === 'function' && !this._readableState.destroyed) { + if (typeof this._flush === 'function' && !this.destroyed) { this._flush((er, data) => { - done(this, er, data); + if (er) { + this.destroy(er); + return; + } + + if (data != null) { + this.push(data); + } + this.push(null); }); } else { - done(this, null, null); + this.push(null); } } -Transform.prototype.push = function(chunk, encoding) { - this._transformState.needTransform = false; - return Duplex.prototype.push.call(this, chunk, encoding); -}; - -// This is the part where you do stuff! -// override this function in implementation classes. -// 'chunk' is an input chunk. -// -// Call `push(newChunk)` to pass along transformed output -// to the readable side. You may call 'push' zero or more times. -// -// Call `cb(err)` when you are done with this chunk. If you pass -// an error, then that'll put the hurt on the whole operation. If you -// never call cb(), then you'll never get another chunk. -Transform.prototype._transform = function(chunk, encoding, cb) { +Transform.prototype._transform = function(chunk, encoding, callback) { throw new ERR_METHOD_NOT_IMPLEMENTED('_transform()'); }; -Transform.prototype._write = function(chunk, encoding, cb) { - const ts = this._transformState; - ts.writecb = cb; - ts.writechunk = chunk; - ts.writeencoding = encoding; - if (!ts.transforming) { - const rs = this._readableState; - if (ts.needTransform || - rs.needReadable || - rs.length < rs.highWaterMark) - this._read(rs.highWaterMark); - } +Transform.prototype._write = function(chunk, encoding, callback) { + const rState = this._readableState; + const wState = this._writableState; + const length = rState.length; + + this._transform(chunk, encoding, (err, val) => { + if (err) { + callback(err); + return; + } + + if (val != null) { + this.push(val); + } + + if ( + wState.ended || // Backwards compat. + length === rState.length || // Backwards compat. + rState.length < rState.highWaterMark || + rState.length === 0 + ) { + callback(); + } else { + this[kCallback] = callback; + } + }); }; -// Doesn't matter what the args are here. -// _transform does all the work. -// That we got here means that the readable side wants more data. -Transform.prototype._read = function(n) { - const ts = this._transformState; - - if (ts.writechunk !== null && !ts.transforming) { - ts.transforming = true; - this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); - } else { - // Mark that we need a transform, so that any data that comes in - // will get processed, now that we've asked for it. - ts.needTransform = true; +Transform.prototype._read = function() { + if (this[kCallback]) { + const callback = this[kCallback]; + this[kCallback] = null; + callback(); } }; - - -Transform.prototype._destroy = function(err, cb) { - Duplex.prototype._destroy.call(this, err, (err2) => { - cb(err2); - }); -}; - - -function done(stream, er, data) { - if (er) - return stream.emit('error', er); - - if (data != null) // Single equals check for both `null` and `undefined` - stream.push(data); - - // These two error cases are coherence checks that can likely not be tested. - if (stream._writableState.length) - throw new ERR_TRANSFORM_WITH_LENGTH_0(); - - if (stream._transformState.transforming) - throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); - return stream.push(null); -} diff --git a/lib/internal/errors.js b/lib/internal/errors.js index e0668e2f827e5f..4c583981725b3b 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1363,12 +1363,8 @@ E('ERR_TLS_SNI_FROM_SERVER', E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED', 'At least one category is required', TypeError); E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error); -E('ERR_TRANSFORM_ALREADY_TRANSFORMING', - 'Calling transform done when still transforming', Error); // This should probably be a `RangeError`. -E('ERR_TRANSFORM_WITH_LENGTH_0', - 'Calling transform done when writableState.length != 0', Error); E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError); E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET', '`process.setupUncaughtExceptionCapture()` was called while a capture ' + diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index fa24ce152d71ed..ad9e1a2237bd72 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -45,10 +45,9 @@ const Transform = require('_stream_transform'); assert.strictEqual(tx.readableLength, 10); assert.strictEqual(transformed, 10); - assert.strictEqual(tx._transformState.writechunk.length, 5); assert.deepStrictEqual(tx.writableBuffer.map(function(c) { return c.chunk.length; - }), [6, 7, 8, 9, 10]); + }), [5, 6, 7, 8, 9, 10]); } { diff --git a/test/parallel/test-zlib-flush-drain.js b/test/parallel/test-zlib-flush-drain.js index ac89e990c3fcda..6993d2c9fe6594 100644 --- a/test/parallel/test-zlib-flush-drain.js +++ b/test/parallel/test-zlib-flush-drain.js @@ -28,6 +28,9 @@ const ws = deflater._writableState; const beforeFlush = ws.needDrain; let afterFlush = ws.needDrain; +deflater.on('data', () => { +}); + deflater.flush(function(err) { afterFlush = ws.needDrain; });