Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(http): stop listening to request's close event once it has emitted response #3625

Merged
merged 13 commits into from Apr 26, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,8 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :bug: (Bug Fix)

* fix(http-instrumentation): stop listening to `request`'s `close` event once it has emitted `response` [#3625](https://github.com/open-telemetry/opentelemetry-js/pull/3625) @SimenB

### :books: (Refine Doc)

### :house: (Internal)
Expand Down
Expand Up @@ -315,6 +315,11 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
this._callRequestHook(span, request);
}

/**
* Determines if the request has errored or the response has ended/errored.
*/
let responseFinished = false;

/*
* User 'response' event listeners can be added before our listener,
* force our listener to be the first, so response emitter is bound
Expand All @@ -323,6 +328,7 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
request.prependListener(
'response',
(response: http.IncomingMessage & { aborted?: boolean }) => {
this._diag.debug('outgoingRequest on response()');
const responseAttributes =
utils.getOutgoingRequestAttributesOnResponse(response);
span.setAttributes(responseAttributes);
Expand All @@ -344,9 +350,13 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
);

context.bind(context.active(), response);
this._diag.debug('outgoingRequest on response()');
response.on('end', () => {

const endHandler = () => {
this._diag.debug('outgoingRequest on end()');
if (responseFinished) {
return;
}
responseFinished = true;
let status: SpanStatus;

if (response.aborted && !response.complete) {
Expand Down Expand Up @@ -381,15 +391,24 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
startTime,
metricAttributes
);
});
};

response.on('end', endHandler);
// See https://github.com/open-telemetry/opentelemetry-js/pull/3625#issuecomment-1475673533
if (semver.lt(process.version, '16.0.0')) {
response.on('close', endHandler);
}
response.on(errorMonitor, (error: Err) => {
this._diag.debug('outgoingRequest on error()', error);
if (responseFinished) {
return;
}
responseFinished = true;
utils.setSpanWithError(span, error);
const code = utils.parseResponseStatus(
SpanKind.CLIENT,
response.statusCode
);
span.setStatus({ code, message: error.message });
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
this._closeHttpSpan(
span,
SpanKind.CLIENT,
Expand All @@ -401,12 +420,18 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
);
request.on('close', () => {
this._diag.debug('outgoingRequest on request close()');
if (!request.aborted) {
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
if (request.aborted || responseFinished) {
return;
}
responseFinished = true;
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
});
request.on(errorMonitor, (error: Err) => {
this._diag.debug('outgoingRequest on request error()', error);
if (responseFinished) {
return;
}
responseFinished = true;
utils.setSpanWithError(span, error);
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
});
Expand Down
Expand Up @@ -16,11 +16,13 @@
import {
SpanStatusCode,
context,
diag,
propagation,
Span as ISpan,
SpanKind,
trace,
SpanAttributes,
DiagConsoleLogger,
} from '@opentelemetry/api';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import {
Expand Down Expand Up @@ -269,6 +271,14 @@ describe('HttpInstrumentation', () => {
// hang the request.
return;
}
if (request.url?.includes('/destroy-request')) {
// force flush http response header to trigger client response callback
response.write('');
setTimeout(() => {
request.socket.destroy();
}, 100);
return;
}
if (request.url?.includes('/ignored')) {
provider.getTracer('test').startSpan('some-span').end();
}
Expand Down Expand Up @@ -861,19 +871,20 @@ describe('HttpInstrumentation', () => {
});

it('should have 2 ended span when client prematurely close', async () => {
const promise = new Promise<void>((resolve, reject) => {
const promise = new Promise<void>(resolve => {
const req = http.get(
`${protocol}://${hostname}:${serverPort}/hang`,
res => {
res.on('close', () => {});
res.on('error', () => {});
}
);
// close the socket.
setTimeout(() => {
req.destroy();
}, 10);

req.on('error', reject);
req.on('error', () => {});

req.on('close', () => {
// yield to server to end the span.
Expand Down Expand Up @@ -919,6 +930,41 @@ describe('HttpInstrumentation', () => {
assert.strictEqual(clientSpan.status.code, SpanStatusCode.ERROR);
assert.ok(Object.keys(clientSpan.attributes).length >= 6);
});

it('should not end span multiple times if request socket destroyed before response completes', async () => {
const warnMessages: string[] = [];
diag.setLogger({
...new DiagConsoleLogger(),
warn: message => {
warnMessages.push(message);
},
});
const promise = new Promise<void>(resolve => {
const req = http.request(
`${protocol}://${hostname}:${serverPort}/destroy-request`,
{
// Allow `req.write()`.
method: 'POST',
},
res => {
res.on('end', () => {});
res.on('close', () => {
resolve();
});
res.on('error', () => {});
}
);
// force flush http request header to trigger client response callback
req.write('');
req.on('error', () => {});
});

await promise;

diag.disable();

assert.deepStrictEqual(warnMessages, []);
});
});

describe('with require parent span', () => {
Expand Down