Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Node.js 18.19.0, add Node 21.x to CI #528

Merged
merged 8 commits into from Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/node.yml
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
node-version: [12.x, 14.x, 16.x, 18.x, 20.x]
node-version: [12.x, 14.x, 16.x, 18.x, 20.x, 21.x]
exclude:
- os: windows-latest
node-version: 12.x
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -11,9 +11,9 @@
npm install readable-stream
```

This package is a mirror of the streams implementations in Node.js 18.16.0.
This package is a mirror of the streams implementations in Node.js 18.19.0.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.16.0/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v18.19.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** _only_ and avoid the _"stream"_ module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
1 change: 1 addition & 0 deletions build/build.mjs
Expand Up @@ -69,6 +69,7 @@ async function extract(nodeVersion, tarFile) {
})

await finished(tarFile.pipe(parser))
info('extraction done')
return contents
}

Expand Down
51 changes: 48 additions & 3 deletions build/replacements.mjs
Expand Up @@ -42,6 +42,11 @@ const internalStreamsNoRequireAbortController = [
'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;'
]

const internalStreamsNoRequireAbortController2 = [
'const \\{ AbortController, AbortSignal \\} = .+',
'const AbortController = globalThis.AbortController || require(\'abort-controller\').AbortController;'
]

const internalStreamsRequireInternal = ["require\\('internal/([^']+)'\\)", "require('../$1')"]

const internalStreamsRequireErrors = ["require\\('internal/errors'\\)", "require('../../ours/errors')"]
Expand Down Expand Up @@ -74,7 +79,12 @@ const internalStreamsRequireWebStream = ["require\\('internal/webstreams/adapter

const internalStreamsWeakHandler = [
"const \\{ kWeakHandler \\} = require\\('../event_target'\\);",
"const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');"
"require\\('../event_target'\\);const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');"
]

const internalStreamsWeakHandler2 = [
"const \\{ kWeakHandler, kResistStopPropagation \\} = .*;",
"const kWeakHandler = require('../../ours/primordials').Symbol('kWeak');\nconst kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation');"
]

const internalValidatorsNoCoalesceAssignment = [
Expand Down Expand Up @@ -142,6 +152,7 @@ const testCommonKnownGlobals = [
typeof AbortController !== 'undefined' ? AbortController : require('abort-controller').AbortController,
typeof AbortSignal !== 'undefined' ? AbortSignal : require('abort-controller').AbortSignal,
typeof EventTarget !== 'undefined' ? EventTarget : require('event-target-shim').EventTarget,
typeof navigator !== 'undefined' ? navigator : {},
`
]

Expand Down Expand Up @@ -238,6 +249,32 @@ const readmeLink = ['(\\[Node.js website\\]\\(https://nodejs.org/dist/v)(\\d+.\\

const streamRequire = [ "require\\('stream'\\)", "require('../../lib/stream.js')" ]

const removeWebStreamsFromDuplexFromTest= [
'const { ReadableStream, WritableStream } = .+;',
`function makeATestReadableStream(value) {
return Readable.from([value])
}
function makeATestWritableStream(writeFunc) {
return new Writable({
write(chunk, enc, cb) {
writeFunc(chunk)
cb()
}
})
}
`
]

const duplexFromTestWebStreamNeutralizeReadable = [
'makeATestReadableStream\\(value\\) {',
'makeATestReadableStreamOff(value) {'
]

const duplexFromTestWebStreamNeutralizeWritable = [
'makeATestWritableStream\\(writeFunc\\) {',
'makeATestWritableStreamOff(writeFunc) {'
]

export const replacements = {
'lib/_stream.+': [legacyStreamsRequireStream],
'lib/internal/streams/duplexify.+': [
Expand All @@ -248,7 +285,9 @@ export const replacements = {
],
'lib/internal/streams/(operators|pipeline).+': [
internalStreamsAbortControllerPolyfill,
internalStreamsNoRequireAbortController
internalStreamsNoRequireAbortController,
internalStreamsNoRequireAbortController2,
internalStreamsWeakHandler2
],
'lib/internal/streams/readable.js': [
removefromWebReadableMethod,
Expand Down Expand Up @@ -307,7 +346,13 @@ export const replacements = {
testParallelSilentConsole,
testParallelTimersPromises
],
'test/parallel/test-stream-duplex-from.js': [testParallelDuplexFromBlob, testParallelDuplexSkipWithoutBlob],
'test/parallel/test-stream-duplex-from.js': [
testParallelDuplexFromBlob,
testParallelDuplexSkipWithoutBlob,
duplexFromTestWebStreamNeutralizeReadable,
duplexFromTestWebStreamNeutralizeWritable,
removeWebStreamsFromDuplexFromTest
],
'test/parallel/test-stream-finished.js': [testParallelFinishedEvent],
'test/parallel/test-stream-flatMap.js': [testParallelFlatMapWinLineSeparator],
'test/parallel/test-stream-preprocess.js': [testParallelPreprocessWinLineSeparator],
Expand Down
7 changes: 5 additions & 2 deletions lib/internal/streams/add-abort-signal.js
@@ -1,9 +1,11 @@
'use strict'

const { SymbolDispose } = require('../../ours/primordials')
const { AbortError, codes } = require('../../ours/errors')
const { isNodeStream, isWebStream, kControllerErrorFunction } = require('./utils')
const eos = require('./end-of-stream')
const { ERR_INVALID_ARG_TYPE } = codes
let addAbortListener

// This method is inlined here for readable-stream
// It also does not allow for signal to not exist on the stream
Expand Down Expand Up @@ -42,8 +44,9 @@ module.exports.addAbortSignalNoValidate = function (signal, stream) {
if (signal.aborted) {
onAbort()
} else {
signal.addEventListener('abort', onAbort)
eos(stream, () => signal.removeEventListener('abort', onAbort))
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(signal, onAbort)
eos(stream, disposable[SymbolDispose])
}
return stream
}
2 changes: 1 addition & 1 deletion lib/internal/streams/compose.js
Expand Up @@ -74,7 +74,7 @@ module.exports = function compose(...streams) {
d = new Duplex({
// TODO (ronag): highWaterMark?
writableObjectMode: !!(head !== null && head !== undefined && head.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.writableObjectMode),
readableObjectMode: !!(tail !== null && tail !== undefined && tail.readableObjectMode),
writable,
readable
})
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/destroy.js
Expand Up @@ -12,7 +12,7 @@ const {
AbortError
} = require('../../ours/errors')
const { Symbol } = require('../../ours/primordials')
const { kDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const { kIsDestroyed, isDestroyed, isFinished, isServerRequest } = require('./utils')
const kDestroy = Symbol('kDestroy')
const kConstruct = Symbol('kConstruct')
function checkError(err, w, r) {
Expand Down Expand Up @@ -278,7 +278,7 @@ function destroyer(stream, err) {
process.nextTick(emitCloseLegacy, stream)
}
if (!stream.destroyed) {
stream[kDestroyed] = true
stream[kIsDestroyed] = true
}
}
module.exports = {
Expand Down
41 changes: 20 additions & 21 deletions lib/internal/streams/duplexify.js
Expand Up @@ -13,7 +13,9 @@ const {
isNodeStream,
isReadableNodeStream,
isWritableNodeStream,
isDuplexNodeStream
isDuplexNodeStream,
isReadableStream,
isWritableStream
} = require('./utils')
const eos = require('./end-of-stream')
const {
Expand All @@ -23,6 +25,7 @@ const {
const { destroyer } = require('./destroy')
const Duplex = require('./duplex')
const Readable = require('./readable')
const Writable = require('./writable')
const { createDeferredPromise } = require('../../ours/util')
const from = require('./from')
const Blob = globalThis.Blob || bufferModule.Blob
Expand Down Expand Up @@ -77,17 +80,16 @@ module.exports = function duplexify(body, name) {
readable: false
})
}

// TODO: Webstreams
// if (isReadableStream(body)) {
// return _duplexify({ readable: Readable.fromWeb(body) });
// }

// TODO: Webstreams
// if (isWritableStream(body)) {
// return _duplexify({ writable: Writable.fromWeb(body) });
// }

if (isReadableStream(body)) {
return _duplexify({
readable: Readable.fromWeb(body)
})
}
if (isWritableStream(body)) {
return _duplexify({
writable: Writable.fromWeb(body)
})
}
if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body)
if (isIterable(value)) {
Expand Down Expand Up @@ -144,15 +146,12 @@ module.exports = function duplexify(body, name) {
writable: false
})
}

// TODO: Webstreams.
// if (
// isReadableStream(body?.readable) &&
// isWritableStream(body?.writable)
// ) {
// return Duplexify.fromWeb(body);
// }

if (
isReadableStream(body === null || body === undefined ? undefined : body.readable) &&
isWritableStream(body === null || body === undefined ? undefined : body.writable)
) {
return Duplexify.fromWeb(body)
}
if (
typeof (body === null || body === undefined ? undefined : body.writable) === 'object' ||
typeof (body === null || body === undefined ? undefined : body.readable) === 'object'
Expand Down
13 changes: 8 additions & 5 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -11,7 +11,7 @@ const { AbortError, codes } = require('../../ours/errors')
const { ERR_INVALID_ARG_TYPE, ERR_STREAM_PREMATURE_CLOSE } = codes
const { kEmptyObject, once } = require('../../ours/util')
const { validateAbortSignal, validateFunction, validateObject, validateBoolean } = require('../validators')
const { Promise, PromisePrototypeThen } = require('../../ours/primordials')
const { Promise, PromisePrototypeThen, SymbolDispose } = require('../../ours/primordials')
const {
isClosed,
isReadable,
Expand All @@ -28,6 +28,7 @@ const {
willEmitClose: _willEmitClose,
kIsClosedPromise
} = require('./utils')
let addAbortListener
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function'
}
Expand Down Expand Up @@ -212,12 +213,13 @@ function eos(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}
}
return cleanup
Expand All @@ -238,12 +240,13 @@ function eosWeb(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
options.signal.removeEventListener('abort', abort)
disposable[SymbolDispose]()
originalCallback.apply(stream, args)
})
options.signal.addEventListener('abort', abort)
}
}
const resolverFn = (...args) => {
Expand Down