Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Node v18.16.0 #515

Merged
merged 6 commits into from May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -11,9 +11,9 @@
npm install readable-stream
```

This package is a mirror of the streams implementations in Node.js 18.9.0.
This package is a mirror of the streams implementations in Node.js 18.16.0.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.9.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
4 changes: 3 additions & 1 deletion build/files.mjs
Expand Up @@ -25,7 +25,9 @@ export const skippedSources = [
'test/parallel/test-stream-readable-async-iterators.js',
'test/parallel/test-stream-wrap-drain.js',
'test/parallel/test-stream-wrap-encoding.js',
'test/parallel/test-stream-wrap.js'
'test/parallel/test-stream-wrap.js',
'test/parallel/test-stream-toWeb-allows-server-response.js',
'test/parallel/test-readable-from-web-enqueue-then-close.js'
]

export const aliases = {}
19 changes: 19 additions & 0 deletions build/replacements.mjs
Expand Up @@ -17,6 +17,16 @@ const internalStreamsAbortControllerPolyfill = [
`
]

const internalStreamsAbortControllerPolyfill2 = [
"'use strict'",
`
'use strict'

const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;

`
]

const internalStreamsNoRequireBlob = [
"const \\{\\n isBlob,\\n\\} = require\\('internal/blob'\\);",
`
Expand Down Expand Up @@ -51,6 +61,8 @@ const internalStreamsRequireStream = ["require\\('stream'\\)", "require('../../s

const internalStreamsRequireStreams = ["require\\('internal/streams/([^']+)'\\)", "require('./$1')"]

const streamSlashPromisesToStreamDotPromises= ["require\\('(node:)?stream/promises'\\)", "require('../../lib/stream').promises"]

const internalStreamsRequireUtil = [
"require\\('internal/util(?:/(?:debuglog|inspect))?'\\)",
"require('../../ours/util')"
Expand Down Expand Up @@ -301,6 +313,13 @@ export const replacements = {
testParallelTicksReenableConsoleLog,
testParallelTickSaveHook
],
'test/parallel/test-stream3-pipeline-async-iterator.js': [
internalStreamsAbortControllerPolyfill2,
streamSlashPromisesToStreamDotPromises
],
'test/parallel/test-stream-compose-operator.js': [
internalStreamsAbortControllerPolyfill2
],
'test/parallel/test-stream2-readable-from-list.js': [testParallelReadableBufferListInspect],
'README.md': [readmeInfo, readmeLink]
}
30 changes: 18 additions & 12 deletions lib/internal/streams/add-abort-signal.js
@@ -1,6 +1,7 @@
'use strict'

const { AbortError, codes } = require('../../ours/errors')
const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils')
const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes

Expand All @@ -12,27 +13,32 @@ const validateAbortSignal = (signal, name) => {
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal)
}
}
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function')
}
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal')
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream)
if (!isNodeStream(stream) && !isWebStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream)
}
return module.exports.addAbortSignalNoValidate(signal, stream)
}
module.exports.addAbortSignalNoValidate = function (signal, stream) {
if (typeof signal !== 'object' || !('aborted' in signal)) {
return stream
}
const onAbort = () => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason
})
)
}
const onAbort = isNodeStream(stream)
? () => {
stream.destroy(
new AbortError(undefined, {
cause: signal.reason
})
)
}
: () => {
stream[kControllerErrorFunction](
new AbortError(undefined, {
cause: signal.reason
})
)
}
if (signal.aborted) {
onAbort()
} else {
Expand Down
143 changes: 101 additions & 42 deletions lib/internal/streams/compose.js
Expand Up @@ -3,11 +3,20 @@
const { pipeline } = require('./pipeline')
const Duplex = require('./duplex')
const { destroyer } = require('./destroy')
const { isNodeStream, isReadable, isWritable } = require('./utils')
const {
isNodeStream,
isReadable,
isWritable,
isWebStream,
isTransformStream,
isWritableStream,
isReadableStream
} = require('./utils')
const {
AbortError,
codes: { ERR_INVALID_ARG_VALUE, ERR_MISSING_ARGS }
} = require('../../ours/errors')
const eos = require('./end-of-stream')
module.exports = function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams')
Expand All @@ -24,14 +33,17 @@ module.exports = function compose(...streams) {
streams[idx] = Duplex.from(streams[idx])
}
for (let n = 0; n < streams.length; ++n) {
if (!isNodeStream(streams[n])) {
if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
if (
n < streams.length - 1 &&
!(isReadable(streams[n]) || isReadableStream(streams[n]) || isTransformStream(streams[n]))
) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be readable')
}
if (n > 0 && !isWritable(streams[n])) {
if (n > 0 && !(isWritable(streams[n]) || isWritableStream(streams[n]) || isTransformStream(streams[n]))) {
throw new ERR_INVALID_ARG_VALUE(`streams[${n}]`, orgStreams[n], 'must be writable')
}
}
Expand All @@ -53,8 +65,8 @@ module.exports = function compose(...streams) {
}
const head = streams[0]
const tail = pipeline(streams, onfinished)
const writable = !!isWritable(head)
const readable = !!isReadable(tail)
const writable = !!(isWritable(head) || isWritableStream(head) || isTransformStream(head))
const readable = !!(isReadable(tail) || isReadableStream(tail) || isTransformStream(tail))

// TODO(ronag): Avoid double buffering.
// Implement Writable/Readable/Duplex traits.
Expand All @@ -67,25 +79,49 @@ module.exports = function compose(...streams) {
readable
})
if (writable) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback()
} else {
ondrain = callback
if (isNodeStream(head)) {
d._write = function (chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback()
} else {
ondrain = callback
}
}
}
d._final = function (callback) {
head.end()
onfinish = callback
}
head.on('drain', function () {
if (ondrain) {
const cb = ondrain
ondrain = null
cb()
d._final = function (callback) {
head.end()
onfinish = callback
}
})
tail.on('finish', function () {
head.on('drain', function () {
if (ondrain) {
const cb = ondrain
ondrain = null
cb()
}
})
} else if (isWebStream(head)) {
const writable = isTransformStream(head) ? head.writable : head
const writer = writable.getWriter()
d._write = async function (chunk, encoding, callback) {
try {
await writer.ready
writer.write(chunk).catch(() => {})
callback()
} catch (err) {
callback(err)
}
}
d._final = async function (callback) {
try {
await writer.ready
writer.close().catch(() => {})
onfinish = callback
} catch (err) {
callback(err)
}
}
}
const toRead = isTransformStream(tail) ? tail.readable : tail
eos(toRead, () => {
if (onfinish) {
const cb = onfinish
onfinish = null
Expand All @@ -94,25 +130,46 @@ module.exports = function compose(...streams) {
})
}
if (readable) {
tail.on('readable', function () {
if (onreadable) {
const cb = onreadable
onreadable = null
cb()
}
})
tail.on('end', function () {
d.push(null)
})
d._read = function () {
while (true) {
const buf = tail.read()
if (buf === null) {
onreadable = d._read
return
if (isNodeStream(tail)) {
tail.on('readable', function () {
if (onreadable) {
const cb = onreadable
onreadable = null
cb()
}
})
tail.on('end', function () {
d.push(null)
})
d._read = function () {
while (true) {
const buf = tail.read()
if (buf === null) {
onreadable = d._read
return
}
if (!d.push(buf)) {
return
}
}
if (!d.push(buf)) {
return
}
} else if (isWebStream(tail)) {
const readable = isTransformStream(tail) ? tail.readable : tail
const reader = readable.getReader()
d._read = async function () {
while (true) {
try {
const { value, done } = await reader.read()
if (!d.push(value)) {
return
}
if (done) {
d.push(null)
return
}
} catch {
return
}
}
}
}
Expand All @@ -128,7 +185,9 @@ module.exports = function compose(...streams) {
callback(err)
} else {
onclose = callback
destroyer(tail, err)
if (isNodeStream(tail)) {
destroyer(tail, err)
}
}
}
return d
Expand Down
19 changes: 11 additions & 8 deletions lib/internal/streams/destroy.js
Expand Up @@ -36,7 +36,7 @@ function destroy(err, cb) {
const w = this._writableState
// With duplex streams we use the writable side for state.
const s = w || r
if ((w && w.destroyed) || (r && r.destroyed)) {
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
if (typeof cb === 'function') {
cb()
}
Expand Down Expand Up @@ -107,14 +107,14 @@ function emitCloseNT(self) {
if (r) {
r.closeEmitted = true
}
if ((w && w.emitClose) || (r && r.emitClose)) {
if ((w !== null && w !== undefined && w.emitClose) || (r !== null && r !== undefined && r.emitClose)) {
self.emit('close')
}
}
function emitErrorNT(self, err) {
const r = self._readableState
const w = self._writableState
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
if ((w !== null && w !== undefined && w.errorEmitted) || (r !== null && r !== undefined && r.errorEmitted)) {
return
}
if (w) {
Expand Down Expand Up @@ -162,10 +162,11 @@ function errorOrDestroy(stream, err, sync) {

const r = stream._readableState
const w = stream._writableState
if ((w && w.destroyed) || (r && r.destroyed)) {
if ((w !== null && w !== undefined && w.destroyed) || (r !== null && r !== undefined && r.destroyed)) {
return this
}
if ((r && r.autoDestroy) || (w && w.autoDestroy)) stream.destroy(err)
if ((r !== null && r !== undefined && r.autoDestroy) || (w !== null && w !== undefined && w.autoDestroy))
stream.destroy(err)
else if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack // eslint-disable-line no-unused-expressions
Expand Down Expand Up @@ -228,16 +229,18 @@ function constructNT(stream) {
}
}
try {
stream._construct(onConstruct)
stream._construct((err) => {
process.nextTick(onConstruct, err)
})
} catch (err) {
onConstruct(err)
process.nextTick(onConstruct, err)
}
}
function emitConstructNT(stream) {
stream.emit(kConstruct)
}
function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function'
return (stream === null || stream === undefined ? undefined : stream.setHeader) && typeof stream.abort === 'function'
}
function emitCloseLegacy(stream) {
stream.emit('close')
Expand Down
2 changes: 0 additions & 2 deletions lib/internal/streams/duplexify.js
Expand Up @@ -282,8 +282,6 @@ function _duplexify(pair) {
cb(err)
} else if (err) {
d.destroy(err)
} else if (!readable && !writable) {
d.destroy()
}
}

Expand Down