Skip to content

Commit

Permalink
fix(http): stop listening to request's close event once it has em…
Browse files Browse the repository at this point in the history
…itted `response` (#3625)


Co-authored-by: Chengzhong Wu <chengzhong.wcz@alibaba-inc.com>
Co-authored-by: Marc Pichler <marc.pichler@dynatrace.com>
  • Loading branch information
3 people committed Apr 26, 2023
1 parent 6676414 commit 9347ca6
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/

### :bug: (Bug Fix)

* fix(http-instrumentation): stop listening to `request`'s `close` event once it has emitted `response` [#3625](https://github.com/open-telemetry/opentelemetry-js/pull/3625) @SimenB
* fix(sdk-node): fix initialization in bundled environments by not loading @opentelemetry/exporter-jaeger [#3739](https://github.com/open-telemetry/opentelemetry-js/pull/3739) @pichlermarc

### :books: (Refine Doc)
Expand Down
Expand Up @@ -315,6 +315,11 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
this._callRequestHook(span, request);
}

/**
* Determines if the request has errored or the response has ended/errored.
*/
let responseFinished = false;

/*
* User 'response' event listeners can be added before our listener,
* force our listener to be the first, so response emitter is bound
Expand All @@ -323,6 +328,7 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
request.prependListener(
'response',
(response: http.IncomingMessage & { aborted?: boolean }) => {
this._diag.debug('outgoingRequest on response()');
const responseAttributes =
utils.getOutgoingRequestAttributesOnResponse(response);
span.setAttributes(responseAttributes);
Expand All @@ -344,9 +350,13 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
);

context.bind(context.active(), response);
this._diag.debug('outgoingRequest on response()');
response.on('end', () => {

const endHandler = () => {
this._diag.debug('outgoingRequest on end()');
if (responseFinished) {
return;
}
responseFinished = true;
let status: SpanStatus;

if (response.aborted && !response.complete) {
Expand Down Expand Up @@ -381,15 +391,24 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
startTime,
metricAttributes
);
});
};

response.on('end', endHandler);
// See https://github.com/open-telemetry/opentelemetry-js/pull/3625#issuecomment-1475673533
if (semver.lt(process.version, '16.0.0')) {
response.on('close', endHandler);
}
response.on(errorMonitor, (error: Err) => {
this._diag.debug('outgoingRequest on error()', error);
if (responseFinished) {
return;
}
responseFinished = true;
utils.setSpanWithError(span, error);
const code = utils.parseResponseStatus(
SpanKind.CLIENT,
response.statusCode
);
span.setStatus({ code, message: error.message });
span.setStatus({
code: SpanStatusCode.ERROR,
message: error.message,
});
this._closeHttpSpan(
span,
SpanKind.CLIENT,
Expand All @@ -401,12 +420,18 @@ export class HttpInstrumentation extends InstrumentationBase<Http> {
);
request.on('close', () => {
this._diag.debug('outgoingRequest on request close()');
if (!request.aborted) {
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
if (request.aborted || responseFinished) {
return;
}
responseFinished = true;
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
});
request.on(errorMonitor, (error: Err) => {
this._diag.debug('outgoingRequest on request error()', error);
if (responseFinished) {
return;
}
responseFinished = true;
utils.setSpanWithError(span, error);
this._closeHttpSpan(span, SpanKind.CLIENT, startTime, metricAttributes);
});
Expand Down
Expand Up @@ -16,11 +16,13 @@
import {
SpanStatusCode,
context,
diag,
propagation,
Span as ISpan,
SpanKind,
trace,
SpanAttributes,
DiagConsoleLogger,
} from '@opentelemetry/api';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import {
Expand Down Expand Up @@ -269,6 +271,14 @@ describe('HttpInstrumentation', () => {
// hang the request.
return;
}
if (request.url?.includes('/destroy-request')) {
// force flush http response header to trigger client response callback
response.write('');
setTimeout(() => {
request.socket.destroy();
}, 100);
return;
}
if (request.url?.includes('/ignored')) {
provider.getTracer('test').startSpan('some-span').end();
}
Expand Down Expand Up @@ -861,19 +871,20 @@ describe('HttpInstrumentation', () => {
});

it('should have 2 ended span when client prematurely close', async () => {
const promise = new Promise<void>((resolve, reject) => {
const promise = new Promise<void>(resolve => {
const req = http.get(
`${protocol}://${hostname}:${serverPort}/hang`,
res => {
res.on('close', () => {});
res.on('error', () => {});
}
);
// close the socket.
setTimeout(() => {
req.destroy();
}, 10);

req.on('error', reject);
req.on('error', () => {});

req.on('close', () => {
// yield to server to end the span.
Expand Down Expand Up @@ -919,6 +930,41 @@ describe('HttpInstrumentation', () => {
assert.strictEqual(clientSpan.status.code, SpanStatusCode.ERROR);
assert.ok(Object.keys(clientSpan.attributes).length >= 6);
});

it('should not end span multiple times if request socket destroyed before response completes', async () => {
const warnMessages: string[] = [];
diag.setLogger({
...new DiagConsoleLogger(),
warn: message => {
warnMessages.push(message);
},
});
const promise = new Promise<void>(resolve => {
const req = http.request(
`${protocol}://${hostname}:${serverPort}/destroy-request`,
{
// Allow `req.write()`.
method: 'POST',
},
res => {
res.on('end', () => {});
res.on('close', () => {
resolve();
});
res.on('error', () => {});
}
);
// force flush http request header to trigger client response callback
req.write('');
req.on('error', () => {});
});

await promise;

diag.disable();

assert.deepStrictEqual(warnMessages, []);
});
});

describe('with require parent span', () => {
Expand Down

0 comments on commit 9347ca6

Please sign in to comment.