Skip to content

Commit

Permalink
implement diagnostics channel (#1000)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Sep 12, 2021
1 parent 80afc58 commit 20ce79f
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 5 deletions.
81 changes: 81 additions & 0 deletions docs/api/DiagnosticsChannel.md
@@ -0,0 +1,81 @@
# Diagnostics Channel Support

Stability: Experimental.

Undici support the [`diagnostics_channel`](https://nodejs.org/api/diagnostics_channel.html) (currently available only on Node.js v16+).
It is the preferred way to instrument Undici and retrieve internal informations.

The channels available are the following.

## `undici:request:create`

This message is published when a new outgoing request is created.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:create').subscribe(({ request }) => {
console.log('completed', request.completed)
console.log('method', request.method)
console.log('path', request.path)
console.log('headers') // raw text, e.g: 'bar: bar\r\n'
request.addHeader('hello', 'world')
console.log('headers', request.headers) // e.g. 'bar: bar\r\nhello: world\r\n'
})
```

Note: a request is only loosely completed to a given socket.


## `undici:request:bodySent`

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:bodySent').subscribe(({ request }) => {
// request is the same object undici:request:create
})
```

## `undici:request:headers`

This message is published after the response headers have been received, i.e. the reponse has been completed.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:headers').subscribe(({ request, response }) => {
// request is the same object undici:request:create
console.log('statusCode', response.statusCode)
console.log(response.statusText)
// response.headers are buffers.
console.log(response.headers.map((x) => x.toString()))
})
```

## `undici:request:trailers`

This message is published after the response body and trailers have been received, i.e. the reponse has been completed.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:trailers').subscribe(({ request, trailers }) => {
// request is the same object undici:request:create
console.log('completed', request.completed)
// trailers are buffers.
console.log(trailers.map((x) => x.toString()))
})
```

## `undici:request:error`

This message is published if the request is going to error, but it has not errored yet.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:request:error').subscribe(({ request, error }) => {
// request is the same object undici:request:create
})
```
6 changes: 5 additions & 1 deletion lib/client.js
Expand Up @@ -1439,6 +1439,7 @@ function write (client, request) {
assert(contentLength === null, 'no body must not have content length')
socket.write(`${header}\r\n`, 'ascii')
}
request.onRequestSent()
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

Expand All @@ -1447,6 +1448,7 @@ function write (client, request) {
socket.write(body)
socket.uncork()
request.onBodySent(body)
request.onRequestSent()
if (!expectsPayload) {
socket[kReset] = true
}
Expand Down Expand Up @@ -1563,6 +1565,7 @@ async function writeBlob ({ client, request, socket, contentLength, header, expe
socket.uncork()

request.onBodySent(buffer)
request.onRequestSent()

if (!expectsPayload) {
socket[kReset] = true
Expand Down Expand Up @@ -1689,7 +1692,8 @@ class AsyncWriter {
}

end () {
const { socket, contentLength, client, bytesWritten, expectsPayload, header } = this
const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
request.onRequestSent()

socket[kWriting] = false

Expand Down
47 changes: 45 additions & 2 deletions lib/core/request.js
Expand Up @@ -9,6 +9,23 @@ const assert = require('assert')

const kHandler = Symbol('handler')

const channels = {}

try {
const diagnosticsChannel = require('diagnostics_channel')
channels.create = diagnosticsChannel.channel('undici:request:create')
channels.bodySent = diagnosticsChannel.channel('undici:request:bodySent')
channels.headers = diagnosticsChannel.channel('undici:request:headers')
channels.trailers = diagnosticsChannel.channel('undici:request:trailers')
channels.error = diagnosticsChannel.channel('undici:request:error')
} catch {
channels.create = { hasSubscribers: false }
channels.bodySent = { hasSubscribers: false }
channels.headers = { hasSubscribers: false }
channels.trailers = { hasSubscribers: false }
channels.error = { hasSubscribers: false }
}

class Request {
constructor ({
path,
Expand Down Expand Up @@ -114,6 +131,10 @@ class Request {
this.servername = util.getServerName(this.host)

this[kHandler] = handler

if (channels.create.hasSubscribers) {
channels.create.publish({ request: this })
}
}

onBodySent (chunk) {
Expand All @@ -126,6 +147,12 @@ class Request {
}
}

onRequestSent () {
if (channels.bodySent.hasSubscribers) {
channels.bodySent.publish({ request: this })
}
}

onConnect (abort) {
assert(!this.aborted)
assert(!this.completed)
Expand All @@ -137,6 +164,10 @@ class Request {
assert(!this.aborted)
assert(!this.completed)

if (channels.headers.hasSubscribers) {
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
}

return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
}

Expand All @@ -158,15 +189,27 @@ class Request {
assert(!this.aborted)

this.completed = true
if (channels.trailers.hasSubscribers) {
channels.trailers.publish({ request: this, trailers })
}
return this[kHandler].onComplete(trailers)
}

onError (err) {
onError (error) {
if (channels.error.hasSubscribers) {
channels.error.publish({ request: this, error })
}

if (this.aborted) {
return
}
this.aborted = true
return this[kHandler].onError(err)
return this[kHandler].onError(error)
}

addHeader (key, value) {
processHeader(this, key, value)
return this
}
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -37,8 +37,8 @@
"test:node-fetch": "node scripts/verifyVersion.js 16 && mocha test/node-fetch || echo Skipping",
"test:fetch": "node scripts/verifyVersion.js 16 && tap test/fetch/*.js || echo Skipping",
"test:jest": "jest test/jest/test",
"test:tap": "tap test/*.js",
"test:tdd": "tap test/*.js -w",
"test:tap": "tap test/*.js test/diagnostics-channel/*.js",
"test:tdd": "tap test/*.js test/diagnostics-channel/*.js -w",
"test:typescript": "tsd",
"coverage": "nyc npm run test",
"coverage:ci": "nyc --reporter=lcov npm run test",
Expand Down
52 changes: 52 additions & 0 deletions test/diagnostics-channel/error.js
@@ -0,0 +1,52 @@
'use strict'

const t = require('tap')

let diagnosticsChannel

try {
diagnosticsChannel = require('diagnostics_channel')
} catch {
t.skip('missing diagnostics_channel')
process.exit(0)
}

const { Client } = require('../..')
const { createServer, request } = require('http')

t.plan(3)

const server = createServer((req, res) => {
res.destroy()
})
t.teardown(server.close.bind(server))

const reqHeaders = {
foo: undefined,
bar: 'bar'
}

let _req
diagnosticsChannel.channel('undici:request:create').subscribe(({ request }) => {
_req = request
})

diagnosticsChannel.channel('undici:request:error').subscribe(({ request, error }) => {
t.equal(_req, request)
t.equal(error.code, 'UND_ERR_SOCKET')
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
keepAliveTimeout: 300e3
})
t.teardown(client.close.bind(client))

client.request({
path: '/',
method: 'GET',
headers: reqHeaders
}, (err, data) => {
t.equal(err.code, 'UND_ERR_SOCKET')
})
})
93 changes: 93 additions & 0 deletions test/diagnostics-channel/get.js
@@ -0,0 +1,93 @@
'use strict'

const t = require('tap')

let diagnosticsChannel

try {
diagnosticsChannel = require('diagnostics_channel')
} catch {
t.skip('missing diagnostics_channel')
process.exit(0)
}

const { Client } = require('../..')
const { createServer, request } = require('http')

t.plan(14)

const server = createServer((req, res) => {
res.setHeader('Content-Type', 'text/plain')
res.setHeader('trailer', 'foo')
res.write('hello')
res.addTrailers({
foo: 'oof'
})
res.end()
})
t.teardown(server.close.bind(server))

const reqHeaders = {
foo: undefined,
bar: 'bar'
}

let _req
diagnosticsChannel.channel('undici:request:create').subscribe(({ request }) => {
_req = request
t.equal(request.completed, false)
t.equal(request.method, 'GET')
t.equal(request.path, '/')
t.equal(request.headers, 'bar: bar\r\n')
request.addHeader('hello', 'world')
t.equal(request.headers, 'bar: bar\r\nhello: world\r\n')
})

diagnosticsChannel.channel('undici:request:headers').subscribe(({ request, response }) => {
t.equal(_req, request)
t.equal(response.statusCode, 200)
const expectedHeaders = [
Buffer.from('Content-Type'),
Buffer.from('text/plain'),
Buffer.from('trailer'),
Buffer.from('foo'),
Buffer.from('Date'),
response.headers[5], // This is a date
Buffer.from('Connection'),
Buffer.from('keep-alive'),
Buffer.from('Keep-Alive'),
Buffer.from('timeout=5'),
Buffer.from('Transfer-Encoding'),
Buffer.from('chunked')
]
t.same(response.headers, expectedHeaders)
t.equal(response.statusText, 'OK')
})

let endEmitted = false
diagnosticsChannel.channel('undici:request:trailers').subscribe(({ request, trailers }) => {
t.equal(request.completed, true)
t.equal(_req, request)
// This event is emitted after the last chunk has been added to the body stream,
// not when it was consumed by the application
t.equal(endEmitted, false)
t.same(trailers, [Buffer.from('foo'), Buffer.from('oof')])
})

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`, {
keepAliveTimeout: 300e3
})
t.teardown(client.close.bind(client))

client.request({
path: '/',
method: 'GET',
headers: reqHeaders
}, (err, data) => {
t.error(err)
data.body.on('end', function () {
endEmitted = true
})
})
})

0 comments on commit 20ce79f

Please sign in to comment.