diff --git a/lib/client-connect.js b/lib/client-connect.js index 2b6fcad5dd7..bba455455e9 100644 --- a/lib/client-connect.js +++ b/lib/client-connect.js @@ -55,9 +55,9 @@ class ConnectHandler extends AsyncResource { if (callback) { this.callback = null - process.nextTick((self, callback, err, opaque) => { - self.runInAsyncScope(callback, null, err, { opaque }) - }, this, callback, err, opaque) + util.queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }) + }) } } } @@ -89,7 +89,9 @@ function connect (opts, callback) { signal }, connectHandler) } catch (err) { - process.nextTick(callback, err, { opaque: opts && opts.opaque }) + util.queueMicrotask(() => { + callback(err, { opaque: opts && opts.opaque }) + }) } } diff --git a/lib/client-request.js b/lib/client-request.js index 7c65b76cb37..780c8c2f79d 100644 --- a/lib/client-request.js +++ b/lib/client-request.js @@ -128,8 +128,8 @@ class RequestHandler extends AsyncResource { if (callback) { this.callback = null - process.nextTick((self, callback, err, opaque) => { - self.runInAsyncScope(callback, null, err, { opaque }) + util.queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }) }, this, callback, err, opaque) } @@ -158,7 +158,9 @@ function request (opts, callback) { this.dispatch(opts, new RequestHandler(opts, callback)) } catch (err) { if (typeof callback === 'function') { - process.nextTick(callback, err, { opaque: opts && opts.opaque }) + util.queueMicrotask(() => { + callback(err, { opaque: opts && opts.opaque }) + }) } else { throw err } diff --git a/lib/client-stream.js b/lib/client-stream.js index e9ddc86375c..5168e00d267 100644 --- a/lib/client-stream.js +++ b/lib/client-stream.js @@ -146,9 +146,9 @@ class StreamHandler extends AsyncResource { util.destroy(res, err) } else if (callback) { this.callback = null - process.nextTick((self, callback, err, opaque) => { - self.runInAsyncScope(callback, null, err, { opaque }) - }, this, callback, err, opaque) + util.queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }) + }) } if (body) { @@ -171,7 +171,9 @@ function stream (opts, factory, callback) { this.dispatch(opts, new StreamHandler(opts, factory, callback)) } catch (err) { if (typeof callback === 'function') { - process.nextTick(callback, err, { opaque: opts && opts.opaque }) + util.queueMicrotask(() => { + callback(err, { opaque: opts && opts.opaque }) + }) } else { throw err } diff --git a/lib/client-upgrade.js b/lib/client-upgrade.js index 38b1876c4d1..1aca611fce6 100644 --- a/lib/client-upgrade.js +++ b/lib/client-upgrade.js @@ -57,9 +57,9 @@ class UpgradeHandler extends AsyncResource { if (callback) { this.callback = null - process.nextTick((self, callback, err, opaque) => { - self.runInAsyncScope(callback, null, err, { opaque }) - }, this, callback, err, opaque) + util.queueMicrotask(() => { + this.runInAsyncScope(callback, null, err, { opaque }) + }) } } } @@ -94,7 +94,9 @@ function upgrade (opts, callback) { upgrade: protocol || 'Websocket' }, upgradeHandler) } catch (err) { - process.nextTick(callback, err, { opaque: opts && opts.opaque }) + util.queueMicrotask(() => { + callback(err, { opaque: opts && opts.opaque }) + }) } } diff --git a/lib/core/client.js b/lib/core/client.js index f21950b59c6..56b0822377a 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -248,7 +248,7 @@ class Client extends EventEmitter { } else if (util.isStream(request.body)) { // Wait a tick in case stream is ended in the same tick. this[kResuming] = 1 - process.nextTick(resume, this) + util.queueMicrotask(() => resume(this)) } else { resume(this, true) } @@ -275,7 +275,9 @@ class Client extends EventEmitter { } if (this[kDestroyed]) { - process.nextTick(callback, new ClientDestroyedError(), null) + util.queueMicrotask(() => { + callback(new ClientDestroyedError(), null) + }) return } @@ -310,7 +312,7 @@ class Client extends EventEmitter { if (this[kOnDestroyed]) { this[kOnDestroyed].push(callback) } else { - process.nextTick(callback, null, null) + util.queueMicrotask(() => callback(null, null)) } return } @@ -336,7 +338,7 @@ class Client extends EventEmitter { } if (!this[kSocket]) { - process.nextTick(onDestroyed) + util.queueMicrotask(() => onDestroyed()) } else { util.destroy(this[kSocket].on('close', onDestroyed), err) } @@ -967,7 +969,7 @@ function _resume (client, sync) { } else if (client[kNeedDrain] === 2) { if (sync) { client[kNeedDrain] = 1 - process.nextTick(emitDrain, client) + util.queueMicrotask(() => emitDrain(client)) } else { emitDrain(client) } diff --git a/lib/core/util.js b/lib/core/util.js index fd668e07647..09914874120 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -105,6 +105,7 @@ function destroy (stream, err) { stream.destroy(err) } } else if (err) { + // Node streams use nextTick. process.nextTick((stream, err) => { stream.emit('error', err) }, stream, err) diff --git a/test/client-connect.js b/test/client-connect.js index e007693cab6..b89ab0ada56 100644 --- a/test/client-connect.js +++ b/test/client-connect.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { Client, errors } = require('..') const http = require('http') const EE = require('events') +const util = require('../lib/core/util') test('basic connect', (t) => { t.plan(3) @@ -161,7 +162,7 @@ test('connect wait for empty pipeline', (t) => { t.error(err) }) client.once('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(client.busy, false) client.connect({ diff --git a/test/client-errors.js b/test/client-errors.js index 24119ef2f40..66d7ecfb526 100644 --- a/test/client-errors.js +++ b/test/client-errors.js @@ -7,6 +7,7 @@ const net = require('net') const { Readable } = require('stream') const { kSocket } = require('../lib/core/symbols') +const util = require('../lib/core/util') test('GET errors and reconnect with pipelining 1', (t) => { t.plan(9) @@ -639,7 +640,7 @@ test('socket fail while writing request body', (t) => { body.push('asd') client.on('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { client[kSocket].destroy('kaboom') }) }) @@ -674,7 +675,7 @@ test('socket fail while ending request body', (t) => { const _err = new Error('kaboom') client.on('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { client[kSocket].destroy(_err) }) }) diff --git a/test/client-pipeline.js b/test/client-pipeline.js index 4b6142b6c88..d90add58769 100644 --- a/test/client-pipeline.js +++ b/test/client-pipeline.js @@ -2,6 +2,7 @@ const { test } = require('tap') const { Client, errors } = require('..') +const util = require('../lib/core/util') const EE = require('events') const { createServer } = require('http') const { @@ -231,7 +232,7 @@ test('pipeline error body', (t) => { method: 'PUT' }, ({ body }) => { const pt = new PassThrough() - process.nextTick(() => { + util.queueMicrotask(() => { pt.destroy(new Error('asd')) }) body.on('error', (err) => { @@ -271,7 +272,7 @@ test('pipeline destroy body', (t) => { method: 'PUT' }, ({ body }) => { const pt = new PassThrough() - process.nextTick(() => { + util.queueMicrotask(() => { pt.destroy() }) body.on('error', (err) => { @@ -999,7 +1000,7 @@ test('pipeline abort after headers', (t) => { method: 'GET', signal }, ({ body }) => { - process.nextTick(() => { + util.queueMicrotask(() => { signal.emit('abort') }) return body diff --git a/test/client-reconnect.js b/test/client-reconnect.js index 000dedd6957..2f839a63b27 100644 --- a/test/client-reconnect.js +++ b/test/client-reconnect.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { Client } = require('..') const { createServer } = require('http') const FakeTimers = require('@sinonjs/fake-timers') +const util = require('../lib/core/util') test('multiple reconnect', (t) => { t.plan(5) @@ -39,7 +40,7 @@ test('multiple reconnect', (t) => { if (++n === 1) { t.pass() } - process.nextTick(() => { + util.queueMicrotask(() => { clock.tick(1000) }) }) diff --git a/test/client-request.js b/test/client-request.js index 0541e06a9e0..060e0933533 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -38,7 +38,7 @@ test('request abort before headers', (t) => { signal }, (err) => { t.ok(err instanceof errors.RequestAbortedError) - t.strictEqual(signal.listenerCount('abort'), 1) + t.strictEqual(signal.listenerCount('abort'), 0) }) t.strictEqual(signal.listenerCount('abort'), 2) }) diff --git a/test/client-stream.js b/test/client-stream.js index 51c60194f35..42a51b1937d 100644 --- a/test/client-stream.js +++ b/test/client-stream.js @@ -2,6 +2,7 @@ const { test } = require('tap') const { Client, errors } = require('..') +const util = require('../lib/core/util') const { createServer } = require('http') const { PassThrough, Writable, Readable } = require('stream') const EE = require('events') @@ -623,7 +624,7 @@ test('stream backpressure', (t) => { highWaterMark: 1, write (chunk, encoding, callback) { buf += chunk - process.nextTick(callback) + util.queueMicrotask(callback) } }), (err, data) => { t.error(err) diff --git a/test/client-upgrade.js b/test/client-upgrade.js index 77010836a3a..3a6d63edb75 100644 --- a/test/client-upgrade.js +++ b/test/client-upgrade.js @@ -5,6 +5,7 @@ const { Client, errors } = require('..') const net = require('net') const http = require('http') const EE = require('events') +const util = require('../lib/core/util') test('basic upgrade', (t) => { t.plan(6) @@ -242,7 +243,7 @@ test('upgrade wait for empty pipeline', (t) => { t.error(err) }) client.once('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(client.busy, false) client.upgrade({ diff --git a/test/client-write-max-listeners.js b/test/client-write-max-listeners.js index 31e61d01767..5b770301231 100644 --- a/test/client-write-max-listeners.js +++ b/test/client-write-max-listeners.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { Client } = require('..') const { createServer } = require('http') const { Readable } = require('stream') +const util = require('../lib/core/util') test('socket close listener does not leak', (t) => { t.plan(32) @@ -18,7 +19,7 @@ test('socket close listener does not leak', (t) => { const makeBody = () => { return new Readable({ read () { - process.nextTick(() => { + util.queueMicrotask(() => { this.push(null) }) } diff --git a/test/client.js b/test/client.js index d4c738a219c..f130ac771c8 100644 --- a/test/client.js +++ b/test/client.js @@ -7,6 +7,7 @@ const { readFileSync, createReadStream } = require('fs') const { Readable } = require('stream') const { kSocket } = require('../lib/core/symbols') const EE = require('events') +const util = require('../lib/core/util') const { kConnect } = require('../lib/core/symbols') test('basic get', (t) => { @@ -466,7 +467,7 @@ test('basic POST with empty stream', (t) => { callback(!this._readableState.endEmitted ? new Error('asd') : err) } }).on('end', () => { - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(body.destroyed, true) }) }) @@ -832,7 +833,7 @@ test('increase pipelining', (t) => { t.strictEqual(client.running, 0) client.on('connect', () => { t.strictEqual(client.running, 0) - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(client.running, 1) client.pipelining = 3 t.strictEqual(client.running, 2) @@ -923,7 +924,7 @@ test('POST empty with error', (t) => { }) body.push(null) client.on('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { body.emit('error', new Error('asd')) }) }) diff --git a/test/close-and-destroy.js b/test/close-and-destroy.js index 59101e9f83b..74b97fc5b15 100644 --- a/test/close-and-destroy.js +++ b/test/close-and-destroy.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { Client, errors } = require('..') const { createServer } = require('http') const { kSocket } = require('../lib/core/symbols') +const util = require('../lib/core/util') test('close waits for queued requests to finish', (t) => { t.plan(16) @@ -29,7 +30,7 @@ test('close waits for queued requests to finish', (t) => { // needed because the next element in the queue will be called // after the current function completes - process.nextTick(function () { + util.queueMicrotask(function () { client.close() }) }) diff --git a/test/pipeline-pipelining.js b/test/pipeline-pipelining.js index b3d94d10d04..c0cd58c109c 100644 --- a/test/pipeline-pipelining.js +++ b/test/pipeline-pipelining.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { Client } = require('..') const { createServer } = require('http') const { kConnect } = require('../lib/core/symbols') +const util = require('../lib/core/util') test('pipeline pipelining', (t) => { t.plan(10) @@ -41,7 +42,7 @@ test('pipeline pipelining', (t) => { t.strictEqual(client.busy, true) t.strictDeepEqual(client.running, 0) t.strictDeepEqual(client.pending, 2) - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(client.running, 2) }) }) @@ -102,7 +103,7 @@ test('pipeline pipelining retry', (t) => { t.strictDeepEqual(client.running, 0) t.strictDeepEqual(client.pending, 3) - process.nextTick(() => { + util.queueMicrotask(() => { t.strictEqual(client.running, 3) }) diff --git a/test/request-timeout.js b/test/request-timeout.js index 153713271ae..351e2eca464 100644 --- a/test/request-timeout.js +++ b/test/request-timeout.js @@ -7,6 +7,7 @@ const { createServer } = require('http') const EventEmitter = require('events') const FakeTimers = require('@sinonjs/fake-timers') const { AbortController } = require('abort-controller') +const util = require('../lib/core/util') const { pipeline, Readable, @@ -323,7 +324,7 @@ test('client.close should wait for the timeout', (t) => { }) client.on('connect', () => { - process.nextTick(() => { + util.queueMicrotask(() => { clock.tick(100) }) })