Skip to content

Commit

Permalink
fix: use queueMicrotask
Browse files Browse the repository at this point in the history
Fixes: #562
Refs: nodejs/node#37484
  • Loading branch information
ronag committed Feb 24, 2021
1 parent c608ff3 commit 5ca97d6
Show file tree
Hide file tree
Showing 18 changed files with 60 additions and 38 deletions.
10 changes: 6 additions & 4 deletions lib/client-connect.js
Expand Up @@ -55,9 +55,9 @@ class ConnectHandler extends AsyncResource {

if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
util.queueMicrotask(() => {
this.runInAsyncScope(callback, null, err, { opaque })
})
}
}
}
Expand Down Expand Up @@ -89,7 +89,9 @@ function connect (opts, callback) {
signal
}, connectHandler)
} catch (err) {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
util.queueMicrotask(() => {
callback(err, { opaque: opts && opts.opaque })
})
}
}

Expand Down
8 changes: 5 additions & 3 deletions lib/client-request.js
Expand Up @@ -128,8 +128,8 @@ class RequestHandler extends AsyncResource {

if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
util.queueMicrotask(() => {
this.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
}

Expand Down Expand Up @@ -158,7 +158,9 @@ function request (opts, callback) {
this.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback === 'function') {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
util.queueMicrotask(() => {
callback(err, { opaque: opts && opts.opaque })
})
} else {
throw err
}
Expand Down
10 changes: 6 additions & 4 deletions lib/client-stream.js
Expand Up @@ -146,9 +146,9 @@ class StreamHandler extends AsyncResource {
util.destroy(res, err)
} else if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
util.queueMicrotask(() => {
this.runInAsyncScope(callback, null, err, { opaque })
})
}

if (body) {
Expand All @@ -171,7 +171,9 @@ function stream (opts, factory, callback) {
this.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback === 'function') {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
util.queueMicrotask(() => {
callback(err, { opaque: opts && opts.opaque })
})
} else {
throw err
}
Expand Down
10 changes: 6 additions & 4 deletions lib/client-upgrade.js
Expand Up @@ -57,9 +57,9 @@ class UpgradeHandler extends AsyncResource {

if (callback) {
this.callback = null
process.nextTick((self, callback, err, opaque) => {
self.runInAsyncScope(callback, null, err, { opaque })
}, this, callback, err, opaque)
util.queueMicrotask(() => {
this.runInAsyncScope(callback, null, err, { opaque })
})
}
}
}
Expand Down Expand Up @@ -94,7 +94,9 @@ function upgrade (opts, callback) {
upgrade: protocol || 'Websocket'
}, upgradeHandler)
} catch (err) {
process.nextTick(callback, err, { opaque: opts && opts.opaque })
util.queueMicrotask(() => {
callback(err, { opaque: opts && opts.opaque })
})
}
}

Expand Down
12 changes: 7 additions & 5 deletions lib/core/client.js
Expand Up @@ -248,7 +248,7 @@ class Client extends EventEmitter {
} else if (util.isStream(request.body)) {
// Wait a tick in case stream is ended in the same tick.
this[kResuming] = 1
process.nextTick(resume, this)
util.queueMicrotask(() => resume(this))
} else {
resume(this, true)
}
Expand All @@ -275,7 +275,9 @@ class Client extends EventEmitter {
}

if (this[kDestroyed]) {
process.nextTick(callback, new ClientDestroyedError(), null)
util.queueMicrotask(() => {
callback(new ClientDestroyedError(), null)
})
return
}

Expand Down Expand Up @@ -310,7 +312,7 @@ class Client extends EventEmitter {
if (this[kOnDestroyed]) {
this[kOnDestroyed].push(callback)
} else {
process.nextTick(callback, null, null)
util.queueMicrotask(() => callback(null, null))
}
return
}
Expand All @@ -336,7 +338,7 @@ class Client extends EventEmitter {
}

if (!this[kSocket]) {
process.nextTick(onDestroyed)
util.queueMicrotask(() => onDestroyed())
} else {
util.destroy(this[kSocket].on('close', onDestroyed), err)
}
Expand Down Expand Up @@ -967,7 +969,7 @@ function _resume (client, sync) {
} else if (client[kNeedDrain] === 2) {
if (sync) {
client[kNeedDrain] = 1
process.nextTick(emitDrain, client)
util.queueMicrotask(() => emitDrain(client))
} else {
emitDrain(client)
}
Expand Down
1 change: 1 addition & 0 deletions lib/core/util.js
Expand Up @@ -105,6 +105,7 @@ function destroy (stream, err) {
stream.destroy(err)
}
} else if (err) {
// Node streams use nextTick.
process.nextTick((stream, err) => {
stream.emit('error', err)
}, stream, err)
Expand Down
3 changes: 2 additions & 1 deletion test/client-connect.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const { Client, errors } = require('..')
const http = require('http')
const EE = require('events')
const util = require('../lib/core/util')

test('basic connect', (t) => {
t.plan(3)
Expand Down Expand Up @@ -161,7 +162,7 @@ test('connect wait for empty pipeline', (t) => {
t.error(err)
})
client.once('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(client.busy, false)

client.connect({
Expand Down
5 changes: 3 additions & 2 deletions test/client-errors.js
Expand Up @@ -7,6 +7,7 @@ const net = require('net')
const { Readable } = require('stream')

const { kSocket } = require('../lib/core/symbols')
const util = require('../lib/core/util')

test('GET errors and reconnect with pipelining 1', (t) => {
t.plan(9)
Expand Down Expand Up @@ -639,7 +640,7 @@ test('socket fail while writing request body', (t) => {
body.push('asd')

client.on('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
client[kSocket].destroy('kaboom')
})
})
Expand Down Expand Up @@ -674,7 +675,7 @@ test('socket fail while ending request body', (t) => {

const _err = new Error('kaboom')
client.on('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
client[kSocket].destroy(_err)
})
})
Expand Down
7 changes: 4 additions & 3 deletions test/client-pipeline.js
Expand Up @@ -2,6 +2,7 @@

const { test } = require('tap')
const { Client, errors } = require('..')
const util = require('../lib/core/util')
const EE = require('events')
const { createServer } = require('http')
const {
Expand Down Expand Up @@ -231,7 +232,7 @@ test('pipeline error body', (t) => {
method: 'PUT'
}, ({ body }) => {
const pt = new PassThrough()
process.nextTick(() => {
util.queueMicrotask(() => {
pt.destroy(new Error('asd'))
})
body.on('error', (err) => {
Expand Down Expand Up @@ -271,7 +272,7 @@ test('pipeline destroy body', (t) => {
method: 'PUT'
}, ({ body }) => {
const pt = new PassThrough()
process.nextTick(() => {
util.queueMicrotask(() => {
pt.destroy()
})
body.on('error', (err) => {
Expand Down Expand Up @@ -999,7 +1000,7 @@ test('pipeline abort after headers', (t) => {
method: 'GET',
signal
}, ({ body }) => {
process.nextTick(() => {
util.queueMicrotask(() => {
signal.emit('abort')
})
return body
Expand Down
3 changes: 2 additions & 1 deletion test/client-reconnect.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const { Client } = require('..')
const { createServer } = require('http')
const FakeTimers = require('@sinonjs/fake-timers')
const util = require('../lib/core/util')

test('multiple reconnect', (t) => {
t.plan(5)
Expand Down Expand Up @@ -39,7 +40,7 @@ test('multiple reconnect', (t) => {
if (++n === 1) {
t.pass()
}
process.nextTick(() => {
util.queueMicrotask(() => {
clock.tick(1000)
})
})
Expand Down
2 changes: 1 addition & 1 deletion test/client-request.js
Expand Up @@ -38,7 +38,7 @@ test('request abort before headers', (t) => {
signal
}, (err) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 1)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 2)
})
Expand Down
3 changes: 2 additions & 1 deletion test/client-stream.js
Expand Up @@ -2,6 +2,7 @@

const { test } = require('tap')
const { Client, errors } = require('..')
const util = require('../lib/core/util')
const { createServer } = require('http')
const { PassThrough, Writable, Readable } = require('stream')
const EE = require('events')
Expand Down Expand Up @@ -623,7 +624,7 @@ test('stream backpressure', (t) => {
highWaterMark: 1,
write (chunk, encoding, callback) {
buf += chunk
process.nextTick(callback)
util.queueMicrotask(callback)
}
}), (err, data) => {
t.error(err)
Expand Down
3 changes: 2 additions & 1 deletion test/client-upgrade.js
Expand Up @@ -5,6 +5,7 @@ const { Client, errors } = require('..')
const net = require('net')
const http = require('http')
const EE = require('events')
const util = require('../lib/core/util')

test('basic upgrade', (t) => {
t.plan(6)
Expand Down Expand Up @@ -242,7 +243,7 @@ test('upgrade wait for empty pipeline', (t) => {
t.error(err)
})
client.once('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(client.busy, false)

client.upgrade({
Expand Down
3 changes: 2 additions & 1 deletion test/client-write-max-listeners.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const { Client } = require('..')
const { createServer } = require('http')
const { Readable } = require('stream')
const util = require('../lib/core/util')

test('socket close listener does not leak', (t) => {
t.plan(32)
Expand All @@ -18,7 +19,7 @@ test('socket close listener does not leak', (t) => {
const makeBody = () => {
return new Readable({
read () {
process.nextTick(() => {
util.queueMicrotask(() => {
this.push(null)
})
}
Expand Down
7 changes: 4 additions & 3 deletions test/client.js
Expand Up @@ -7,6 +7,7 @@ const { readFileSync, createReadStream } = require('fs')
const { Readable } = require('stream')
const { kSocket } = require('../lib/core/symbols')
const EE = require('events')
const util = require('../lib/core/util')
const { kConnect } = require('../lib/core/symbols')

test('basic get', (t) => {
Expand Down Expand Up @@ -466,7 +467,7 @@ test('basic POST with empty stream', (t) => {
callback(!this._readableState.endEmitted ? new Error('asd') : err)
}
}).on('end', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(body.destroyed, true)
})
})
Expand Down Expand Up @@ -832,7 +833,7 @@ test('increase pipelining', (t) => {
t.strictEqual(client.running, 0)
client.on('connect', () => {
t.strictEqual(client.running, 0)
process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(client.running, 1)
client.pipelining = 3
t.strictEqual(client.running, 2)
Expand Down Expand Up @@ -923,7 +924,7 @@ test('POST empty with error', (t) => {
})
body.push(null)
client.on('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
body.emit('error', new Error('asd'))
})
})
Expand Down
3 changes: 2 additions & 1 deletion test/close-and-destroy.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const { Client, errors } = require('..')
const { createServer } = require('http')
const { kSocket } = require('../lib/core/symbols')
const util = require('../lib/core/util')

test('close waits for queued requests to finish', (t) => {
t.plan(16)
Expand All @@ -29,7 +30,7 @@ test('close waits for queued requests to finish', (t) => {

// needed because the next element in the queue will be called
// after the current function completes
process.nextTick(function () {
util.queueMicrotask(function () {
client.close()
})
})
Expand Down
5 changes: 3 additions & 2 deletions test/pipeline-pipelining.js
Expand Up @@ -4,6 +4,7 @@ const { test } = require('tap')
const { Client } = require('..')
const { createServer } = require('http')
const { kConnect } = require('../lib/core/symbols')
const util = require('../lib/core/util')

test('pipeline pipelining', (t) => {
t.plan(10)
Expand Down Expand Up @@ -41,7 +42,7 @@ test('pipeline pipelining', (t) => {
t.strictEqual(client.busy, true)
t.strictDeepEqual(client.running, 0)
t.strictDeepEqual(client.pending, 2)
process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(client.running, 2)
})
})
Expand Down Expand Up @@ -102,7 +103,7 @@ test('pipeline pipelining retry', (t) => {
t.strictDeepEqual(client.running, 0)
t.strictDeepEqual(client.pending, 3)

process.nextTick(() => {
util.queueMicrotask(() => {
t.strictEqual(client.running, 3)
})

Expand Down
3 changes: 2 additions & 1 deletion test/request-timeout.js
Expand Up @@ -7,6 +7,7 @@ const { createServer } = require('http')
const EventEmitter = require('events')
const FakeTimers = require('@sinonjs/fake-timers')
const { AbortController } = require('abort-controller')
const util = require('../lib/core/util')
const {
pipeline,
Readable,
Expand Down Expand Up @@ -323,7 +324,7 @@ test('client.close should wait for the timeout', (t) => {
})

client.on('connect', () => {
process.nextTick(() => {
util.queueMicrotask(() => {
clock.tick(100)
})
})
Expand Down

0 comments on commit 5ca97d6

Please sign in to comment.