From 623099db4b7d581c7e173d71b662b4da66c3d2b7 Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Mon, 7 Sep 2020 19:23:28 -0700 Subject: [PATCH] http: report request start and end with diagnostics_channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/34895 Reviewed-By: Bryan English Reviewed-By: Gerhard Stöbich Reviewed-By: Vladimir de Turckheim Reviewed-By: Rich Trott Reviewed-By: Gabriel Schulhof Reviewed-By: Michael Dawson --- benchmark/diagnostics_channel/http.js | 96 +++++++++++++++++++ lib/_http_server.js | 22 +++++ ...t-diagnostics-channel-http-server-start.js | 65 +++++++++++++ 3 files changed, 183 insertions(+) create mode 100644 benchmark/diagnostics_channel/http.js create mode 100644 test/parallel/test-diagnostics-channel-http-server-start.js diff --git a/benchmark/diagnostics_channel/http.js b/benchmark/diagnostics_channel/http.js new file mode 100644 index 00000000000000..55fac8a706df15 --- /dev/null +++ b/benchmark/diagnostics_channel/http.js @@ -0,0 +1,96 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); +const http = require('http'); + +const bench = common.createBenchmark(main, { + apm: ['none', 'diagnostics_channel', 'patch'], + type: 'buffer', + len: 1024, + chunks: 4, + connections: [50, 500], + chunkedEnc: 1, + duration: 5 +}); + +function main({ apm, connections, duration, type, len, chunks, chunkedEnc }) { + const done = { none, patch, diagnostics_channel }[apm](); + + const server = require('../fixtures/simple-http-server.js') + .listen(common.PORT) + .on('listening', () => { + const path = `/${type}/${len}/${chunks}/normal/${chunkedEnc}`; + bench.http({ + path, + connections, + duration + }, () => { + server.close(); + if (done) done(); + }); + }); +} + +function none() {} + +function patch() { + const als = new AsyncLocalStorage(); + const times = []; + + const { emit } = http.Server.prototype; + function wrappedEmit(...args) { + const [name, req, res] = args; + if (name === 'request') { + als.enterWith({ + url: req.url, + start: process.hrtime.bigint() + }); + + res.on('finish', () => { + times.push({ + ...als.getStore(), + statusCode: res.statusCode, + end: process.hrtime.bigint() + }); + }); + } + return emit.apply(this, args); + } + http.Server.prototype.emit = wrappedEmit; + + return () => { + http.Server.prototype.emit = emit; + }; +} + +function diagnostics_channel() { + const als = new AsyncLocalStorage(); + const times = []; + + const start = dc.channel('http.server.request.start'); + const finish = dc.channel('http.server.response.finish'); + + function onStart(req) { + als.enterWith({ + url: req.url, + start: process.hrtime.bigint() + }); + } + + function onFinish(res) { + times.push({ + ...als.getStore(), + statusCode: res.statusCode, + end: process.hrtime.bigint() + }); + } + + start.subscribe(onStart); + finish.subscribe(onFinish); + + return () => { + start.unsubscribe(onStart); + finish.unsubscribe(onFinish); + }; +} diff --git a/lib/_http_server.js b/lib/_http_server.js index dbcff03ca53e5d..1890e665cb9b55 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -79,6 +79,10 @@ const { observerCounts, constants } = internalBinding('performance'); const { setTimeout, clearTimeout } = require('timers'); const { NODE_PERFORMANCE_ENTRY_TYPE_HTTP } = constants; +const dc = require('diagnostics_channel'); +const onRequestStartChannel = dc.channel('http.server.request.start'); +const onResponseFinishChannel = dc.channel('http.server.response.finish'); + const kServerResponse = Symbol('ServerResponse'); const kServerResponseStatistics = Symbol('ServerResponseStatistics'); @@ -754,6 +758,15 @@ function clearRequestTimeout(req) { } function resOnFinish(req, res, socket, state, server) { + if (onResponseFinishChannel.hasSubscribers) { + onResponseFinishChannel.publish({ + request: req, + response: res, + socket, + server + }); + } + // Usually the first incoming element should be our request. it may // be that in the case abortIncoming() was called that the incoming // array will be empty. @@ -839,6 +852,15 @@ function parserOnIncoming(server, socket, state, req, keepAlive) { res.shouldKeepAlive = keepAlive; DTRACE_HTTP_SERVER_REQUEST(req, socket); + if (onRequestStartChannel.hasSubscribers) { + onRequestStartChannel.publish({ + request: req, + response: res, + socket, + server + }); + } + if (socket._httpMessage) { // There are already pending outgoing res, append. state.outgoing.push(res); diff --git a/test/parallel/test-diagnostics-channel-http-server-start.js b/test/parallel/test-diagnostics-channel-http-server-start.js new file mode 100644 index 00000000000000..9a8136d4cc5839 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http-server-start.js @@ -0,0 +1,65 @@ +'use strict'; + +const common = require('../common'); +const { AsyncLocalStorage } = require('async_hooks'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); +const http = require('http'); + +const incomingStartChannel = dc.channel('http.server.request.start'); +const outgoingFinishChannel = dc.channel('http.server.response.finish'); + +const als = new AsyncLocalStorage(); +let context; + +// Bind requests to an AsyncLocalStorage context +incomingStartChannel.subscribe(common.mustCall((message) => { + als.enterWith(message); + context = message; +})); + +// When the request ends, verify the context has been maintained +// and that the messages contain the expected data +outgoingFinishChannel.subscribe(common.mustCall((message) => { + const data = { + request, + response, + server, + socket: request.socket + }; + + // Context is maintained + compare(als.getStore(), context); + + compare(context, data); + compare(message, data); +})); + +let request; +let response; + +const server = http.createServer(common.mustCall((req, res) => { + request = req; + response = res; + + setTimeout(() => { + res.end('done'); + }, 1); +})); + +server.listen(() => { + const { port } = server.address(); + http.get(`http://localhost:${port}`, (res) => { + res.resume(); + res.on('end', () => { + server.close(); + }); + }); +}); + +function compare(a, b) { + assert.strictEqual(a.request, b.request); + assert.strictEqual(a.response, b.response); + assert.strictEqual(a.socket, b.socket); + assert.strictEqual(a.server, b.server); +}