Skip to content

Commit

Permalink
don't propagate async scope across events (#301)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 8, 2020
1 parent 3faf438 commit 0cd575b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 51 deletions.
34 changes: 8 additions & 26 deletions lib/client-pipeline.js
Expand Up @@ -11,16 +11,13 @@ const {
RequestAbortedError
} = require('./errors')
const util = require('./util')
const { AsyncResource } = require('async_hooks')

// TODO: Refactor

const kResume = Symbol('resume')

class PipelineRequest extends AsyncResource {
class PipelineRequest {
constructor (client, opts, callback) {
super('UNDICI_PIPELINE')

if (opts.onInfo && typeof opts.onInfo !== 'function') {
throw new InvalidArgumentError('invalid opts.onInfo')
}
Expand All @@ -42,7 +39,7 @@ class PipelineRequest extends AsyncResource {
}

this.callback = null
this.res = this.runInAsyncScope(callback, this, null, {
this.res = callback(null, {
statusCode,
headers,
opaque,
Expand All @@ -52,8 +49,7 @@ class PipelineRequest extends AsyncResource {

_onData (chunk) {
const { res } = this

return this.runInAsyncScope(res, null, null, chunk)
return res(null, chunk)
}

_onComplete (trailers) {
Expand All @@ -66,12 +62,12 @@ class PipelineRequest extends AsyncResource {

if (res) {
this.res = null
this.runInAsyncScope(res, null, err, null)
res(err, null)
}

if (callback) {
this.callback = null
this.runInAsyncScope(callback, null, err, null)
callback(err, null)
}
}
}
Expand Down Expand Up @@ -110,7 +106,7 @@ module.exports = function (client, opts, handler) {
ret.end()
}

request.runInAsyncScope(callback, null, err, null)
callback(err)
}
})

Expand Down Expand Up @@ -140,11 +136,7 @@ module.exports = function (client, opts, handler) {
util.destroy(req, err)
util.destroy(res, err)

request.runInAsyncScope(
callback,
null,
err
)
callback(err)
}
}).on('prefinish', () => {
// Node < 15 does not call _final in same tick.
Expand All @@ -167,7 +159,6 @@ module.exports = function (client, opts, handler) {
resume
} = data

const request = this
res = new Readable({
autoDestroy: true,
read: resume,
Expand All @@ -182,12 +173,7 @@ module.exports = function (client, opts, handler) {
util.destroy(ret, err)
}

request.runInAsyncScope(
callback,
null,
err,
null
)
callback(err)
}
})

Expand Down Expand Up @@ -234,10 +220,6 @@ module.exports = function (client, opts, handler) {
}
})

if (typeof body._destroy === 'function') {
body._destroy = this.runInAsyncScope.bind(this, body._destroy, body)
}

return function (err, chunk) {
if (res.destroyed) {
return null
Expand Down
14 changes: 5 additions & 9 deletions lib/client-request.js
Expand Up @@ -8,21 +8,17 @@ const {
const util = require('./util')
const { AsyncResource } = require('async_hooks')

const kRequest = Symbol('request')

class RequestResponse extends Readable {
constructor (request, resume) {
constructor (resume) {
super({ autoDestroy: true, read: resume })

this[kRequest] = request
}

_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

this[kRequest].runInAsyncScope(callback, null, err, null)
callback(err)
}
}

Expand Down Expand Up @@ -58,7 +54,7 @@ class RequestRequest extends AsyncResource {
return
}

const body = new RequestResponse(this, resume)
const body = new RequestResponse(resume)

this.callback = null
this.res = body
Expand All @@ -69,7 +65,7 @@ class RequestRequest extends AsyncResource {
_onData (chunk) {
const { res } = this

if (this.runInAsyncScope(res.push, res, chunk)) {
if (res.push(chunk)) {
return true
} else if (!res._readableState.destroyed) {
return false
Expand All @@ -85,7 +81,7 @@ class RequestRequest extends AsyncResource {
return
}

this.runInAsyncScope(res.push, res, null)
res.push(null)
}

_onError (err) {
Expand Down
10 changes: 3 additions & 7 deletions lib/client-stream.js
Expand Up @@ -75,20 +75,16 @@ class StreamRequest extends AsyncResource {
}

this.callback = null
callback(err, err ? null : { opaque, trailers })
this.runInAsyncScope(callback, null, err, err ? null : { opaque, trailers })
})

if (typeof res._destroy === 'function') {
res._destroy = this.runInAsyncScope.bind(this, res._destroy, res)
}

this.res = res
}

_onData (chunk) {
const { res } = this

if (this.runInAsyncScope(res.write, res, chunk)) {
if (res.write(chunk)) {
return true
} else if (!util.isDestroyed(res)) {
return false
Expand All @@ -105,7 +101,7 @@ class StreamRequest extends AsyncResource {
}

this.trailers = trailers || {}
this.runInAsyncScope(res.end, res)
res.end()
}

_onError (err) {
Expand Down
18 changes: 9 additions & 9 deletions test/async_hooks.js
Expand Up @@ -68,12 +68,12 @@ test('async hooks', (t) => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })

body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
t.pass()
body.resume()
})

body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
t.pass()
})
})
})
Expand All @@ -90,12 +90,12 @@ test('async hooks', (t) => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })

body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
t.pass()
body.resume()
})

body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
t.pass()
})
})
})
Expand All @@ -112,12 +112,12 @@ test('async hooks', (t) => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })

body.once('data', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
t.pass()
body.resume()
})

body.on('end', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world' })
t.pass()
})
})
})
Expand Down Expand Up @@ -207,10 +207,10 @@ test('async hooks error and close', (t) => {
client.request({ path: '/', method: 'GET' }, (err, data) => {
t.error(err)
data.body.on('error', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
t.pass()
})
data.body.on('close', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
t.pass()
})
})
})
Expand All @@ -236,7 +236,7 @@ test('async hooks pipeline close', (t) => {
return body
})
.on('close', () => {
t.strictDeepEqual(getCurrentTransaction(), { hello: 'world2' })
t.pass()
})
.on('error', (err) => {
t.ok(err)
Expand Down

0 comments on commit 0cd575b

Please sign in to comment.