Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Node v18.9.0 #490

Merged
merged 1 commit into from Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -11,9 +11,9 @@
npm install readable-stream
```

This package is a mirror of the streams implementations in Node.js 18.0.0.
This package is a mirror of the streams implementations in Node.js 18.9.0.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.0.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.9.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
2 changes: 2 additions & 0 deletions build/files.mjs
Expand Up @@ -18,6 +18,8 @@ export const skippedSources = [
'lib/_stream_wrap.js',
'test/parallel/test-stream-consumers.js',
'test/parallel/test-stream-destroy.js',
'test/parallel/test-stream-duplex.js',
'test/parallel/test-stream-readable-strategy-option.js',
'test/parallel/test-stream-map.js',
'test/parallel/test-stream-pipeline.js',
'test/parallel/test-stream-readable-async-iterators.js',
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/destroy.js
Expand Up @@ -318,7 +318,7 @@ function destroyer(stream, err) {
// TODO: Don't lose err?
stream.close()
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream)
process.nextTick(emitErrorCloseLegacy, stream, err)
} else {
process.nextTick(emitCloseLegacy, stream)
}
Expand Down
47 changes: 38 additions & 9 deletions lib/internal/streams/duplex.js
Expand Up @@ -74,16 +74,45 @@ function Duplex(options) {
}

ObjectDefineProperties(Duplex.prototype, {
writable: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable'),
writableHighWaterMark: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark'),
writableObjectMode: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode'),
writableBuffer: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer'),
writableLength: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength'),
writableFinished: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished'),
writableCorked: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
writableEnded: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
writableNeedDrain: ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),
writable: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writable')
},
writableHighWaterMark: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableHighWaterMark')
},
writableObjectMode: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableObjectMode')
},
writableBuffer: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableBuffer')
},
writableLength: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableLength')
},
writableFinished: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableFinished')
},
writableCorked: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked')
},
writableEnded: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded')
},
writableNeedDrain: {
__proto__: null,
...ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain')
},
destroyed: {
__proto__: null,

get() {
if (this._readableState === undefined || this._writableState === undefined) {
return false
Expand Down
6 changes: 3 additions & 3 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -10,7 +10,7 @@ const { AbortError, codes } = require('../../ours/errors')

const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes

const { once } = require('../../ours/util')
const { kEmptyObject, once } = require('../../ours/util')

const { validateAbortSignal, validateFunction, validateObject } = require('../validators')

Expand Down Expand Up @@ -41,9 +41,9 @@ function eos(stream, options, callback) {

if (arguments.length === 2) {
callback = options
options = {}
options = kEmptyObject
} else if (options == null) {
options = {}
options = kEmptyObject
} else {
validateObject(options, 'options')
}
Expand Down
3 changes: 3 additions & 0 deletions lib/internal/streams/lazy_transform.js
Expand Up @@ -34,6 +34,7 @@ function makeGetter(name) {
function makeSetter(name) {
return function (val) {
ObjectDefineProperty(this, name, {
__proto__: null,
value: val,
enumerable: true,
configurable: true,
Expand All @@ -44,12 +45,14 @@ function makeSetter(name) {

ObjectDefineProperties(LazyTransform.prototype, {
_readableState: {
__proto__: null,
get: makeGetter('_readableState'),
set: makeSetter('_readableState'),
configurable: true,
enumerable: true
},
_writableState: {
__proto__: null,
get: makeGetter('_writableState'),
set: makeSetter('_writableState'),
configurable: true,
Expand Down
5 changes: 2 additions & 3 deletions lib/internal/streams/operators.js
Expand Up @@ -20,7 +20,7 @@ const {
NumberIsNaN,
Promise,
PromiseReject,
PromisePrototypeCatch,
PromisePrototypeThen,
Symbol
} = require('../../ours/primordials')

Expand Down Expand Up @@ -127,7 +127,7 @@ function map(fn, options) {
queue.push(kEof)
} catch (err) {
const val = PromiseReject(err)
PromisePrototypeCatch(val, onDone)
PromisePrototypeThen(val, undefined, onDone)
queue.push(val)
} finally {
var _options$signal3
Expand Down Expand Up @@ -223,7 +223,6 @@ function asIndexedPairs(options = undefined) {
}

async function some(fn, options = undefined) {
// eslint-disable-next-line no-unused-vars
for await (const unused of filter.call(this, fn, options)) {
return true
}
Expand Down
20 changes: 18 additions & 2 deletions lib/internal/streams/pipeline.js
Expand Up @@ -18,7 +18,13 @@ const Duplex = require('./duplex')

const {
aggregateTwoErrors,
codes: { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED },
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_MISSING_ARGS,
ERR_STREAM_DESTROYED,
ERR_STREAM_PREMATURE_CLOSE
},
AbortError
} = require('../../ours/errors')

Expand Down Expand Up @@ -358,6 +364,13 @@ function pipelineImpl(streams, callback, opts) {
}

function pipe(src, dst, finish, { end }) {
let ended = false
dst.on('close', () => {
if (!ended) {
// Finish if the destination closes before the source has completed.
finish(new ERR_STREAM_PREMATURE_CLOSE())
}
})
src.pipe(dst, {
end
})
Expand All @@ -366,7 +379,10 @@ function pipe(src, dst, finish, { end }) {
// Compat. Before node v10.12.0 stdio used to throw an error so
// pipe() did/does not end() stdio destinations.
// Now they allow it but "secretly" don't close the underlying fd.
src.once('end', () => dst.end())
src.once('end', () => {
ended = true
dst.end()
})
} else {
finish()
}
Expand Down
23 changes: 21 additions & 2 deletions lib/internal/streams/readable.js
Expand Up @@ -1080,6 +1080,8 @@ async function* createAsyncIterator(stream, options) {

ObjectDefineProperties(Readable.prototype, {
readable: {
__proto__: null,

get() {
const r = this._readableState // r.readable === false means that this is part of a Duplex stream
// where the readable side was disabled upon construction.
Expand All @@ -1097,12 +1099,14 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableDidRead: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.dataEmitted
}
},
readableAborted: {
__proto__: null,
enumerable: false,
get: function () {
return !!(
Expand All @@ -1113,18 +1117,21 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableHighWaterMark: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.highWaterMark
}
},
readableBuffer: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState && this._readableState.buffer
}
},
readableFlowing: {
__proto__: null,
enumerable: false,
get: function () {
return this._readableState.flowing
Expand All @@ -1136,39 +1143,46 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableLength: {
__proto__: null,
enumerable: false,

get() {
return this._readableState.length
}
},
readableObjectMode: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.objectMode : false
}
},
readableEncoding: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.encoding : null
}
},
errored: {
__proto__: null,
enumerable: false,

get() {
return this._readableState ? this._readableState.errored : null
}
},
closed: {
__proto__: null,

get() {
return this._readableState ? this._readableState.closed : false
}
},
destroyed: {
__proto__: null,
enumerable: false,

get() {
Expand All @@ -1187,6 +1201,7 @@ ObjectDefineProperties(Readable.prototype, {
}
},
readableEnded: {
__proto__: null,
enumerable: false,

get() {
Expand All @@ -1197,12 +1212,16 @@ ObjectDefineProperties(Readable.prototype, {
ObjectDefineProperties(ReadableState.prototype, {
// Legacy getter for `pipesCount`.
pipesCount: {
__proto__: null,

get() {
return this.pipes.length
}
},
// Legacy property for `paused`.
paused: {
__proto__: null,

get() {
return this[kPaused] !== false
},
Expand Down Expand Up @@ -1295,8 +1314,8 @@ Readable.fromWeb = function (readableStream, options) {
return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options)
}

Readable.toWeb = function (streamReadable) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable)
Readable.toWeb = function (streamReadable, options) {
return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options)
}

Readable.wrap = function (src, options) {
Expand Down
29 changes: 25 additions & 4 deletions lib/internal/streams/transform.js
Expand Up @@ -69,12 +69,35 @@ const { ERR_METHOD_NOT_IMPLEMENTED } = require('../../ours/errors').codes

const Duplex = require('./duplex')

const { getHighWaterMark } = require('./state')

ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype)
ObjectSetPrototypeOf(Transform, Duplex)
const kCallback = Symbol('kCallback')

function Transform(options) {
if (!(this instanceof Transform)) return new Transform(options)
if (!(this instanceof Transform)) return new Transform(options) // TODO (ronag): This should preferably always be
// applied but would be semver-major. Or even better;
// make Transform a Readable with the Writable interface.

const readableHighWaterMark = options ? getHighWaterMark(this, options, 'readableHighWaterMark', true) : null

if (readableHighWaterMark === 0) {
// A Duplex will buffer both on the writable and readable side while
// a Transform just wants to buffer hwm number of elements. To avoid
// buffering twice we disable buffering on the writable side.
options = {
...options,
highWaterMark: null,
readableHighWaterMark,
// TODO (ronag): 0 is not optimal since we have
// a "bug" where we check needDrain before calling _write and not after.
// Refs: https://github.com/nodejs/node/pull/32887
// Refs: https://github.com/nodejs/node/pull/35941
writableHighWaterMark: options.writableHighWaterMark || 0
}
}

Duplex.call(this, options) // We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
Expand Down Expand Up @@ -155,9 +178,7 @@ Transform.prototype._write = function (chunk, encoding, callback) {
if (
wState.ended || // Backwards compat.
length === rState.length || // Backwards compat.
rState.length < rState.highWaterMark ||
rState.highWaterMark === 0 ||
rState.length === 0
rState.length < rState.highWaterMark
) {
callback()
} else {
Expand Down