Skip to content

Commit

Permalink
Running on Node 16
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <hello@matteocollina.com>
  • Loading branch information
mcollina committed Dec 15, 2023
1 parent cf4c054 commit bf97973
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 8 deletions.
21 changes: 17 additions & 4 deletions build/replacements.mjs
Expand Up @@ -275,11 +275,21 @@ const duplexFromTestWebStreamNeutralizeWritable = [
'makeATestWritableStreamOff(writeFunc) {'
]

const polyfillAddAbortSignal = [
const polyfillAddAbortListener = [
'require\\(\'events\'\\).addAbortListener',
'require(\'../../ours/util\').addAbortListener'
]

const abortSignalAny = [
'AbortSignal.any',
'require(\'../../ours/util\').AbortSignalAny'
]

const asyncDisposeTest = [
'Symbol.asyncDispose',
'require(\'../../lib/ours/primordials\').SymbolAsyncDispose'
]

export const replacements = {
'lib/_stream.+': [legacyStreamsRequireStream],
'lib/internal/streams/duplexify.+': [
Expand All @@ -292,10 +302,11 @@ export const replacements = {
internalStreamsAbortControllerPolyfill,
internalStreamsNoRequireAbortController,
internalStreamsNoRequireAbortController2,
internalStreamsWeakHandler2
internalStreamsWeakHandler2,
abortSignalAny
],
'lib/internal/streams/add-abort-signal.js': [
polyfillAddAbortSignal
polyfillAddAbortListener
],
'lib/internal/streams/readable.js': [
removefromWebReadableMethod,
Expand All @@ -314,7 +325,8 @@ export const replacements = {
internalStreamsRequireWebStream,
internalStreamsRequireInternal,
internalStreamsWeakHandler,
internalStreamsInspectCustom
internalStreamsInspectCustom,
polyfillAddAbortListener
],
'lib/internal/validators.js': [
internalValidatorsRequireAssert,
Expand Down Expand Up @@ -362,6 +374,7 @@ export const replacements = {
removeWebStreamsFromDuplexFromTest
],
'test/parallel/test-stream-finished.js': [testParallelFinishedEvent],
'test/parallel/test-stream-readable-dispose.js': [asyncDisposeTest],
'test/parallel/test-stream-flatMap.js': [testParallelFlatMapWinLineSeparator],
'test/parallel/test-stream-preprocess.js': [testParallelPreprocessWinLineSeparator],
'test/parallel/test-stream-writable-samecb-singletick.js': [
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/end-of-stream.js
Expand Up @@ -213,7 +213,7 @@ function eos(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
addAbortListener ??= require('../../ours/util').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
Expand All @@ -240,7 +240,7 @@ function eosWeb(stream, options, callback) {
if (options.signal.aborted) {
process.nextTick(abort)
} else {
addAbortListener ??= require('events').addAbortListener
addAbortListener ??= require('../../ours/util').addAbortListener
const disposable = addAbortListener(options.signal, abort)
const originalCallback = callback
callback = once((...args) => {
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/operators.js
Expand Up @@ -66,7 +66,7 @@ function map(fn, options) {
validateInteger(highWaterMark, 'options.highWaterMark', 0)
highWaterMark += concurrency
return async function* map() {
const signal = AbortSignal.any(
const signal = require('../../ours/util').AbortSignalAny(
[options === null || options === undefined ? undefined : options.signal].filter(Boolean)
)
const stream = this
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/streams/pipeline.js
Expand Up @@ -186,7 +186,7 @@ function pipelineImpl(streams, callback, opts) {
function abort() {
finishImpl(new AbortError())
}
addAbortListener ??= require('events').addAbortListener
addAbortListener ??= require('../../ours/util').addAbortListener
let disposable
if (outerSignal) {
disposable = addAbortListener(outerSignal, abort)
Expand Down
10 changes: 10 additions & 0 deletions lib/ours/util.js
Expand Up @@ -2,6 +2,7 @@

const bufferModule = require('buffer')
const { kResistStopPropagation, SymbolDispose } = require('./primordials')
const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal
const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor
const Blob = globalThis.Blob || bufferModule.Blob
/* eslint-disable indent */
Expand Down Expand Up @@ -159,6 +160,15 @@ module.exports = {
: _removeEventListener()
}
}
},
AbortSignalAny:
AbortSignal.any ||
function AbortSignalAny(signals) {
if (signals[0]) {
return signals[0]
}
const ac = new AbortController()
return ac.signal
}
}
module.exports.promisify.custom = Symbol.for('nodejs.util.promisify.custom')
8 changes: 8 additions & 0 deletions src/util.js
Expand Up @@ -2,6 +2,7 @@

const bufferModule = require('buffer')
const { kResistStopPropagation, SymbolDispose } = require('./primordials')
const AbortSignal = global.AbortSignal || require('abort-controller').AbortSignal

const AsyncFunction = Object.getPrototypeOf(async function () {}).constructor
const Blob = globalThis.Blob || bufferModule.Blob
Expand Down Expand Up @@ -160,6 +161,13 @@ module.exports = {
removeEventListener?.();
},
};
},
AbortSignalAny: AbortSignal.any || function AbortSignalAny(signals) {
if (signals[0]) {
return signals[0]
}
const ac = new AbortController()
return ac.signal
}
}

Expand Down
42 changes: 42 additions & 0 deletions test/parallel/test-stream-pipe-deadlock.js
@@ -0,0 +1,42 @@
'use strict'

const tap = require('tap')
const silentConsole = {
log() {},
error() {}
}
const common = require('../common')
const { Readable, Writable } = require('../../lib/ours/index')

// https://github.com/nodejs/node/issues/48666
;(async () => {
// Prepare src that is internally ended, with buffered data pending
const src = new Readable({
read() {}
})
src.push(Buffer.alloc(100))
src.push(null)
src.pause()

// Give it time to settle
await new Promise((resolve) => setImmediate(resolve))
const dst = new Writable({
highWaterMark: 1000,
write(buf, enc, cb) {
process.nextTick(cb)
}
})
dst.write(Buffer.alloc(1000)) // Fill write buffer
dst.on('finish', common.mustCall())
src.pipe(dst)
})().then(common.mustCall())

/* replacement start */
process.on('beforeExit', (code) => {
if (code === 0) {
tap.pass('test succeeded')
} else {
tap.fail(`test failed - exited code ${code}`)
}
})
/* replacement end */
40 changes: 40 additions & 0 deletions test/parallel/test-stream-readable-dispose.js
@@ -0,0 +1,40 @@
'use strict'

const tap = require('tap')
const silentConsole = {
log() {},
error() {}
}
const common = require('../common')
const { Readable } = require('../../lib/ours/index')
const assert = require('assert')
{
const read = new Readable({
read() {}
})
read.resume()
read.on('end', common.mustNotCall('no end event'))
read.on('close', common.mustCall())
read.on(
'error',
common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError')
})
)
read[require('../../lib/ours/primordials').SymbolAsyncDispose]().then(
common.mustCall(() => {
assert.strictEqual(read.errored.name, 'AbortError')
assert.strictEqual(read.destroyed, true)
})
)
}

/* replacement start */
process.on('beforeExit', (code) => {
if (code === 0) {
tap.pass('test succeeded')
} else {
tap.fail(`test failed - exited code ${code}`)
}
})
/* replacement end */
45 changes: 45 additions & 0 deletions test/parallel/test-stream-set-default-hwm.js
@@ -0,0 +1,45 @@
'use strict'

const tap = require('tap')
const silentConsole = {
log() {},
error() {}
}
require('../common')
const assert = require('node:assert')
const {
setDefaultHighWaterMark,
getDefaultHighWaterMark,
Writable,
Readable,
Transform
} = require('../../lib/ours/index')
assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000)
setDefaultHighWaterMark(false, 32 * 1000)
assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000)
assert.notStrictEqual(getDefaultHighWaterMark(true), 32)
setDefaultHighWaterMark(true, 32)
assert.strictEqual(getDefaultHighWaterMark(true), 32)
const w = new Writable({
write() {}
})
assert.strictEqual(w.writableHighWaterMark, 32 * 1000)
const r = new Readable({
read() {}
})
assert.strictEqual(r.readableHighWaterMark, 32 * 1000)
const t = new Transform({
transform() {}
})
assert.strictEqual(t.writableHighWaterMark, 32 * 1000)
assert.strictEqual(t.readableHighWaterMark, 32 * 1000)

/* replacement start */
process.on('beforeExit', (code) => {
if (code === 0) {
tap.pass('test succeeded')
} else {
tap.fail(`test failed - exited code ${code}`)
}
})
/* replacement end */

0 comments on commit bf97973

Please sign in to comment.