Skip to content

Commit

Permalink
Update to Node v18.9.0 (#490)
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <hello@matteocollina.com>

Signed-off-by: Matteo Collina <hello@matteocollina.com>
  • Loading branch information
mcollina committed Sep 23, 2022
1 parent 5eb402c commit 8de46b5
Show file tree
Hide file tree
Showing 35 changed files with 861 additions and 505 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -11,9 +11,9 @@
npm install readable-stream
```

This package is a mirror of the streams implementations in Node.js 18.0.0.
This package is a mirror of the streams implementations in Node.js 18.9.0.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.0.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.9.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
2 changes: 2 additions & 0 deletions build/files.mjs
Expand Up @@ -18,6 +18,8 @@ export const skippedSources = [
'lib/_stream_wrap.js',
'test/parallel/test-stream-consumers.js',
'test/parallel/test-stream-destroy.js',
'test/parallel/test-stream-duplex.js',
'test/parallel/test-stream-readable-strategy-option.js',
'test/parallel/test-stream-map.js',
'test/parallel/test-stream-pipeline.js',
'test/parallel/test-stream-readable-async-iterators.js',
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/destroy.js
Expand Up @@ -318,7 +318,7 @@ function destroyer(stream, err) {
// TODO: Don't lose err?
stream.close()
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream)
process.nextTick(emitErrorCloseLegacy, stream, err)
} else {
process.nextTick(emitCloseLegacy, stream)
}
Expand Down
47 changes: 38 additions & 9 deletions lib/internal/streams/duplex.js
Expand Up @@ -74,16 +74,45 @@ function Duplex(options) {
}

ObjectDefineProperties(Duplex.prototype, {
writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'),
writableHighWaterMark: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'),
writableObjectMode: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'),
writableBuffer: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'),
writableLength: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'),
writableFinished: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'),
writableCorked: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
writableEnded: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
writableNeedDrain: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),
writable: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable')
},
writableHighWaterMark: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark')
},
writableObjectMode: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode')
},
writableBuffer: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer')
},
writableLength: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength')
},
writableFinished: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished')
},
writableCorked: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked')
},
writableEnded: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded')
},
writableNeedDrain: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain')
},
destroyed: {
__proto__: null,

get() {
if (this._readableState === undefined || this._writableState === undefined) {
return false
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -10,7 +10,7 @@ const { AbortError, codes } = require('../../ours/errors')

const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes

const { once } = require('../../ours/util')
const { kEmptyObject, once } = require('../../ours/util')

const { validateAbortSignal, validateFunction, validateObject } = require('../validators')

Expand Down Expand Up @@ -41,9 +41,9 @@ function eos(stream, options, callback) {

if (arguments.length === 2) {
callback = options
options = {}
options = kEmptyObject
} else if (options == null) {
options = {}
options = kEmptyObject
} else {
validateObject(options, 'options')
}
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/streams/lazy_transform.js
Expand Up @@ -34,6 +34,7 @@ function makeGetter(name) {
function makeSetter(name) {
return function (val) {
ObjectDefineProperty(this, name, {
__proto__: null,
value: val,
enumerable: true,
configurable: true,
Expand All @@ -44,12 +45,14 @@ function makeSetter(name) {

ObjectDefineProperties(LazyTransform.prototype, {
_readableState: {
__proto__: null,
get: makeGetter('_readableState'),
set: makeSetter('_readableState'),
configurable: true,
enumerable: true
},
_writableState: {
__proto__: null,
get: makeGetter('_writableState'),
set: makeSetter('_writableState'),
configurable: true,
Expand Down
5 changes: 2 additions & 3 deletions lib/internal/streams/operators.js
Expand Up @@ -20,7 +20,7 @@ const {
NumberIsNaN,
Promise,
PromiseReject,
PromisePrototypeCatch,
PromisePrototypeThen,
Symbol
} = require('../../ours/primordials')

Expand Down Expand Up @@ -127,7 +127,7 @@ function map(fn, options) {
queue.push(kEof)
} catch (err) {
const val = PromiseReject(err)
PromisePrototypeCatch(val, onDone)
PromisePrototypeThen(val, undefined, onDone)
queue.push(val)
} finally {
var _options$signal3
Expand Down Expand Up @@ -223,7 +223,6 @@ function asIndexedPairs(options = undefined) {
}

async function some(fn, options = undefined) {
// eslint-disable-next-line no-unused-vars
for await (const unused of filter.call(this, fn, options)) {
return true
}
Expand Down
20 changes: 18 additions & 2 deletions lib/internal/streams/pipeline.js
Expand Up @@ -18,7 +18,13 @@ const Duplex = require('./duplex')

const {
aggregateTwoErrors,
codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED },
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE
},
AbortError
} = require('../../ours/errors')

Expand Down Expand Up @@ -358,6 +364,13 @@ function pipelineImpl(streams, callback, opts) {
}

function pipe(src, dst, finish, { end }) {
let ended = false
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
finish(new ERR_STREAM_PREMATURE_CLOSE())
}
})
src.pipe(dst, {
end
})
Expand All @@ -366,7 +379,10 @@ function pipe(src, dst, finish, { end }) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => dst.end())
src.once('end', () => {
ended = true
dst.end()
})
} else {
finish()
}
Expand Down
23 changes: 21 additions & 2 deletions lib/internal/streams/readable.js
Expand Up @@ -1080,6 +1080,8 @@ async function* createAsyncIterator(stream, options) {

ObjectDefineProperties(Readable.prototype, {
readable: {
__proto__: null,

get() {
const r = this._readableState // r.readable === false means that this is part of a Duplex stream
// where the readable side was disabled upon construction.
Expand All @@ -1097,12 +1099,14 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableDidRead: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.dataEmitted
}
},
readableAborted: {
__proto__: null,
enumerable: false,
get: function () {
return !!(
Expand All @@ -1113,18 +1117,21 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableHighWaterMark: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.highWaterMark
}
},
readableBuffer: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState && this._readableState.buffer
}
},
readableFlowing: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.flowing
Expand All @@ -1136,39 +1143,46 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableLength: {
__proto__: null,
enumerable: false,

get() {
return this._readableState.length
}
},
readableObjectMode: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.objectMode : false
}
},
readableEncoding: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.encoding : null
}
},
errored: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.errored : null
}
},
closed: {
__proto__: null,

get() {
return this._readableState ? this._readableState.closed : false
}
},
destroyed: {
__proto__: null,
enumerable: false,

get() {
Expand All @@ -1187,6 +1201,7 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableEnded: {
__proto__: null,
enumerable: false,

get() {
Expand All @@ -1197,12 +1212,16 @@ ObjectDefineProperties(Readable.prototype, {
ObjectDefineProperties(ReadableState.prototype, {
// Legacy getter for `pipesCount`.
pipesCount: {
__proto__: null,

get() {
return this.pipes.length
}
},
// Legacy property for `paused`.
paused: {
__proto__: null,

get() {
return this[kPaused] !== false
},
Expand Down Expand Up @@ -1295,8 +1314,8 @@ Readable.fromWeb = function (readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options)
}

Readable.toWeb = function (streamReadable) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable)
Readable.toWeb = function (streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options)
}

Readable.wrap = function (src, options) {
Expand Down
29 changes: 25 additions & 4 deletions lib/internal/streams/transform.js
Expand Up @@ -69,12 +69,35 @@ const { ERR_METHOD_NOT_IMPLEMENTED } = require('../../ours/errors').codes

const Duplex = require('./duplex')

const { getHighWaterMark } = require('./state')

ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype)
ObjectSetPrototypeOf(Transform, Duplex)
const kCallback = Symbol('kCallback')

function Transform(options) {
if (!(this instanceof Transform)) return new Transform(options)
if (!(this instanceof Transform)) return new Transform(options) // TODO (ronag): This should preferably always be
// applied but would be semver-major. Or even better;
// make Transform a Readable with the Writable interface.

const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null

if (readableHighWaterMark === 0) {
// A Duplex will buffer both on the writable and readable side while
// a Transform just wants to buffer hwm number of elements. To avoid
// buffering twice we disable buffering on the writable side.
options = {
...options,
highWaterMark: null,
readableHighWaterMark,
// TODO (ronag): 0 is not optimal since we have
// a "bug" where we check needDrain before calling _write and not after.
// Refs: https://github.com/nodejs/node/pull/32887
// Refs: https://github.com/nodejs/node/pull/35941
writableHighWaterMark: options.writableHighWaterMark || 0
}
}

Duplex.call(this, options) // We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
Expand Down Expand Up @@ -155,9 +178,7 @@ Transform.prototype._write = function (chunk, encoding, callback) {
if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.highWaterMark === 0 ||
rState.length === 0
rState.length < rState.highWaterMark
) {
callback()
} else {
Expand Down

0 comments on commit 8de46b5

Please sign in to comment.