diff --git a/lib/internal/streams/duplex.js b/lib/internal/streams/duplex.js index 799eb5a32022da..834d875be6c4d9 100644 --- a/lib/internal/streams/duplex.js +++ b/lib/internal/streams/duplex.js @@ -35,9 +35,17 @@ const { module.exports = Duplex; +const Stream = require('internal/streams/legacy').Stream; const Readable = require('internal/streams/readable'); const Writable = require('internal/streams/writable'); +const { + addAbortSignal, +} = require('internal/streams/add-abort-signal'); + +const destroyImpl = require('internal/streams/destroy'); +const { kOnConstructed } = require('internal/streams/utils'); + ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype); ObjectSetPrototypeOf(Duplex, Readable); @@ -55,8 +63,8 @@ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); - Readable.call(this, options); - Writable.call(this, options); + this._readableState = new Readable.ReadableState(options, this, true); + this._writableState = new Writable.WritableState(options, this, true); if (options) { this.allowHalfOpen = options.allowHalfOpen !== false; @@ -73,9 +81,39 @@ function Duplex(options) { this._writableState.ended = true; this._writableState.finished = true; } + + if (typeof options.read === 'function') + this._read = options.read; + + if (typeof options.write === 'function') + this._write = options.write; + + if (typeof options.writev === 'function') + this._writev = options.writev; + + if (typeof options.destroy === 'function') + this._destroy = options.destroy; + + if (typeof options.final === 'function') + this._final = options.final; + + if (typeof options.construct === 'function') + this._construct = options.construct; + + if (options.signal) + addAbortSignal(options.signal, this); } else { this.allowHalfOpen = true; } + + Stream.call(this, options); + + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._readableState[kOnConstructed](this); + this._writableState[kOnConstructed](this); + }); + } } ObjectDefineProperties(Duplex.prototype, { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..84266a944d8a41 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -71,6 +71,7 @@ const { AbortError, } = require('internal/errors'); const { validateObject } = require('internal/validators'); +const { kOnConstructed } = require('internal/streams/utils'); const kState = Symbol('kState'); @@ -251,20 +252,14 @@ ObjectDefineProperties(ReadableState.prototype, { function ReadableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream. - // These options can be provided separately as readableXXX and writableXXX. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - // Bit map field to store ReadableState more effciently with 1 bit per field // instead of a V8 slot per field. this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync; + // Object stream flag. Used to make read(n) ignore n and to // make all the buffer merging and length checks go away. - if (options && options.objectMode) this[kState] |= kObjectMode; + if (options && options.objectMode) + this[kState] |= kObjectMode; if (isDuplex && options && options.readableObjectMode) this[kState] |= kObjectMode; @@ -310,16 +305,17 @@ function ReadableState(options, stream, isDuplex) { } } +ReadableState.prototype[kOnConstructed] = function onConstructed(stream) { + if ((this[kState] & kNeedReadable) !== 0) { + maybeReadMore(stream, this); + } +}; function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); - // Checking for a Stream.Duplex instance is faster here instead of inside - // the ReadableState constructor, at least with V8 6.5. - const isDuplex = this instanceof Stream.Duplex; - - this._readableState = new ReadableState(options, this, isDuplex); + this._readableState = new ReadableState(options, this, false); if (options) { if (typeof options.read === 'function') @@ -331,17 +327,17 @@ function Readable(options) { if (typeof options.construct === 'function') this._construct = options.construct; - if (options.signal && !isDuplex) + if (options.signal) addAbortSignal(options.signal, this); } Stream.call(this, options); - destroyImpl.construct(this, () => { - if (this._readableState.needReadable) { - maybeReadMore(this, this._readableState); - } - }); + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._readableState[kOnConstructed](this); + }); + } } Readable.prototype.destroy = destroyImpl.destroy; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js index 1b2a6c0fbf6a05..77efa32c71c8fe 100644 --- a/lib/internal/streams/utils.js +++ b/lib/internal/streams/utils.js @@ -4,6 +4,7 @@ const { SymbolAsyncIterator, SymbolIterator, SymbolFor, + Symbol, } = primordials; // We need to use SymbolFor to make these globally available @@ -16,6 +17,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable'); const kIsWritable = SymbolFor('nodejs.stream.writable'); const kIsDisturbed = SymbolFor('nodejs.stream.disturbed'); +const kOnConstructed = Symbol('kOnConstructed'); + const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise'); const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction'); @@ -303,6 +306,7 @@ function isErrored(stream) { } module.exports = { + kOnConstructed, isDestroyed, kIsDestroyed, isDisturbed, diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index ae860f8c8f00f7..74573033eaa44f 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -44,6 +44,7 @@ const EE = require('events'); const Stream = require('internal/streams/legacy').Stream; const { Buffer } = require('buffer'); const destroyImpl = require('internal/streams/destroy'); +const { kOnConstructed } = require('internal/streams/utils'); const { addAbortSignal, @@ -290,20 +291,15 @@ ObjectDefineProperties(WritableState.prototype, { }); function WritableState(options, stream, isDuplex) { - // Duplex streams are both readable and writable, but share - // the same options object. - // However, some cases require setting options to different - // values for the readable and the writable sides of the duplex stream, - // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. - if (typeof isDuplex !== 'boolean') - isDuplex = stream instanceof Stream.Duplex; - // Bit map field to store WritableState more effciently with 1 bit per field // instead of a V8 slot per field. this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy; - if (options && options.objectMode) this[kState] |= kObjectMode; - if (isDuplex && options && options.writableObjectMode) this[kState] |= kObjectMode; + if (options && options.objectMode) + this[kState] |= kObjectMode; + + if (isDuplex && options && options.writableObjectMode) + this[kState] |= kObjectMode; // The point at which write() starts returning false // Note: 0 is a valid value, means that we always return false if @@ -323,7 +319,7 @@ function WritableState(options, stream, isDuplex) { // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. - const defaultEncoding = options?.defaultEncoding; + const defaultEncoding = options ? options.defaultEncoding : null; if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') { this[kState] |= kDefaultUTF8Encoding; } else if (Buffer.isEncoding(defaultEncoding)) { @@ -372,23 +368,21 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { }, }); -function Writable(options) { - // Writable ctor is applied to Duplexes, too. - // `realHasInstance` is necessary because using plain `instanceof` - // would return false, as no `_writableState` property is attached. - - // Trying to use the custom `instanceof` for Writable here will also break the - // Node.js LazyTransform implementation, which has a non-trivial getter for - // `_writableState` that would lead to infinite recursion. +WritableState.prototype[kOnConstructed] = function onConstructed(stream) { + if ((this[kState] & kWriting) === 0) { + clearBuffer(stream, this); + } - // Checking for a Stream.Duplex instance is faster here instead of inside - // the WritableState constructor, at least with V8 6.5. - const isDuplex = (this instanceof Stream.Duplex); + if ((this[kState] & kEnding) !== 0) { + finishMaybe(stream, this); + } +}; - if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) +function Writable(options) { + if (!(this instanceof Writable)) return new Writable(options); - this._writableState = new WritableState(options, this, isDuplex); + this._writableState = new WritableState(options, this, false); if (options) { if (typeof options.write === 'function') @@ -412,17 +406,11 @@ function Writable(options) { Stream.call(this, options); - destroyImpl.construct(this, () => { - const state = this._writableState; - - if ((state[kState] & kWriting) === 0) { - clearBuffer(this, state); - } - - if ((state[kState] & kEnding) !== 0) { - finishMaybe(this, state); - } - }); + if (this._construct != null) { + destroyImpl.construct(this, () => { + this._writableState[kOnConstructed](this); + }); + } } ObjectDefineProperty(Writable, SymbolHasInstance, {