-
Notifications
You must be signed in to change notification settings - Fork 38
/
express.ts
337 lines (295 loc) · 13.3 KB
/
express.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import { getRPCMetadata } from '@opentelemetry/core';
import { SpanKind, diag, context } from '@opentelemetry/api';
import {
LayerPath,
ExpressLayer,
PatchedRequest,
PATH_STORE,
REQ_SPAN,
EXCEPTION_RECORDED,
ExpressInstrumentationConfig,
CONSUMED_ROUTE_STATE,
ExpressConsumedRouteState,
} from './types';
import { VERSION } from './version';
import {
InstrumentationBase,
InstrumentationModuleDefinition,
InstrumentationNodeModuleDefinition,
InstrumentationNodeModuleFile,
safeExecuteInTheMiddle,
isWrapped,
} from '@opentelemetry/instrumentation';
import type express from 'express';
import {
getRouteAttributes,
getHttpSpanAttributeFromRes,
getHttpSpanAttributesFromReq,
getSpanInitialName,
getSpanNameOnResEnd,
parseResponseStatus,
} from './utils/attributes';
import { consumeLayerPathAndUpdateState, createInitialRouteState } from './utils/route-context';
import { getLayerPathFromFirstArg } from './utils/layer-path';
import { SemanticAttributes } from '@opentelemetry/semantic-conventions';
const originalLayerStore = Symbol('otel.express-plugins.orig-layer-export');
export class ExpressInstrumentation extends InstrumentationBase<typeof express> {
static readonly supportedVersions = ['^4.9.0'];
protected override _config: ExpressInstrumentationConfig;
constructor(config: ExpressInstrumentationConfig = {}) {
super('opentelemetry-instrumentation-express', VERSION, Object.assign({}, config));
}
override setConfig(config: ExpressInstrumentationConfig = {}) {
this._config = Object.assign({}, config);
}
protected init(): InstrumentationModuleDefinition<typeof express> {
const layerModule = new InstrumentationNodeModuleFile<ExpressLayer>(
'express/lib/router/layer.js',
ExpressInstrumentation.supportedVersions,
this._patchExpressLayer.bind(this),
this._unpatchExpressLayer.bind(this)
);
const module = new InstrumentationNodeModuleDefinition<typeof express>(
'express',
ExpressInstrumentation.supportedVersions,
this.patch.bind(this),
this.unpatch.bind(this),
[layerModule]
);
return module;
}
protected patch(moduleExports: typeof express, moduleVersion?: string) {
if (moduleExports === undefined || moduleExports === null) {
return moduleExports;
}
diag.debug('opentelemetry.express instrumentation: patching express application lazyrouter');
// convert to any so we don't get errors because lazyrouter is not public
const application = moduleExports?.application as any;
if (isWrapped(application?.lazyrouter)) {
this._unwrap(application, 'lazyrouter');
}
this._wrap(application, 'lazyrouter', this.getApplicationLazyRouterPatch.bind(this, moduleVersion));
return moduleExports;
}
protected unpatch(moduleExports: typeof express): void {
diag.debug('opentelemetry.express instrumentation: unpatching Express application lazyrouter');
this._unwrap(moduleExports?.application as any, 'lazyrouter');
}
private _patchExpressLayer(moduleExports: ExpressLayer, version: string) {
const self = this;
if (moduleExports === undefined || moduleExports === null) {
return moduleExports;
}
const origLayerConstructor = moduleExports as unknown as Function;
const LayerPrototype = moduleExports.prototype;
diag.debug('opentelemetry.express instrumentation: patching Express Layer handle_request and handle_error');
this._wrap(LayerPrototype, 'handle_request', this._getLayerHandleRequestPatch.bind(this));
this._wrap(LayerPrototype, 'handle_error', this._getLayerHandleErrorPatch.bind(this));
// patch the Layer constructor to collect the 'path'
function OtelPatchedLayer(path, options, fn) {
if (!(this instanceof OtelPatchedLayer)) {
return new (OtelPatchedLayer as any)(path, options, fn);
}
this[PATH_STORE] = getLayerPathFromFirstArg(path, options ?? {});
return origLayerConstructor.call(this, path, options, fn);
}
OtelPatchedLayer.prototype = LayerPrototype;
OtelPatchedLayer[originalLayerStore] = moduleExports;
return OtelPatchedLayer;
}
private _unpatchExpressLayer(moduleExports: ExpressLayer) {
diag.debug('opentelemetry.express instrumentation: unpatching Express Layer');
const originalLayerExport = moduleExports[originalLayerStore] ?? moduleExports;
if (isWrapped(originalLayerExport.prototype.handle_request)) {
this._unwrap(originalLayerExport.prototype, 'handle_request');
}
if (isWrapped(originalLayerExport.prototype.handle_error)) {
this._unwrap(originalLayerExport.prototype, 'handle_error');
}
return originalLayerExport;
}
private _getLayerHandleRequestPatch(original: express.RequestHandler) {
const self = this;
return function (this: ExpressLayer, req: PatchedRequest, res: express.Response, next: express.NextFunction) {
// this is what express is doing to check if layer should be invoke
if (this.handle.length > 3) {
return original.apply(this, arguments);
}
const { origState, newState } = self.getRoutingStateOnConsumingPath(req, this);
const pluginNext: express.NextFunction = function errorHandlingNext(err?: any): any {
if (err && err !== 'route' && err !== 'router') {
self._recordException(req, err);
}
self.runMiddlewareWithContext(origState, req, () => next(err));
};
return self.runMiddlewareWithContext(newState, req, () => original.call(this, req, res, pluginNext));
};
}
private _getLayerHandleErrorPatch(original: express.ErrorRequestHandler) {
const self = this;
return function (
this: ExpressLayer,
err: any,
req: PatchedRequest,
res: express.Response,
next: express.NextFunction
) {
// this is what express is doing to check if layer should be invoke
if (this.handle.length !== 4) {
return original.apply(this, arguments);
}
const { origState, newState } = self.getRoutingStateOnConsumingPath(req, this);
const pluginNext: express.NextFunction = function errorHandlingNext(err?: any): any {
if (err !== 'route' && err !== 'router') {
self._recordException(req, err);
}
self.runMiddlewareWithContext(origState, req, () => next(err));
};
return self.runMiddlewareWithContext(newState, req, () => original.call(this, err, req, res, pluginNext));
};
}
private _recordException(req: PatchedRequest, err: Error) {
try {
if (!err || err[EXCEPTION_RECORDED]) {
return;
}
const span = req[REQ_SPAN];
if (!span) {
return;
}
span.recordException(err);
// mark as recorded to avoid duplicates
Object.defineProperty(err, EXCEPTION_RECORDED, {
enumerable: false,
value: true,
});
} catch {}
}
private registerInstrumentationMiddleware(app: express.Application, moduleVersion?: string) {
const plugin = this;
app.use((req: PatchedRequest, res: express.Response, next: express.NextFunction) => {
// check if this app was mounted in another express app.
// we want the logic to run just once per request
if (req.hasOwnProperty(REQ_SPAN)) {
next();
return;
}
const spanName = getSpanInitialName(req);
const span = plugin.tracer.startSpan(spanName, {
kind: SpanKind.INTERNAL,
attributes: plugin._config.includeHttpAttributes ? getHttpSpanAttributesFromReq(req) : {},
});
if (plugin._config.requestHook) {
safeExecuteInTheMiddle(
() => plugin._config.requestHook(span, { moduleVersion, req, res }),
(e) => {
if (e) diag.error(`opentelemetry.express instrumentation: requestHook error`, e);
},
true
);
}
Object.defineProperty(req, REQ_SPAN, {
enumerable: false,
value: span,
});
const oldResEnd = res.end;
res.end = function () {
const routeState = plugin.getCurrentRouteState(req);
const routeAttributes = getRouteAttributes(routeState);
const route = routeAttributes[SemanticAttributes.HTTP_ROUTE] as string;
if (route) {
const rpcMetadata = getRPCMetadata(context.active());
if (rpcMetadata) {
rpcMetadata.route = route;
}
}
oldResEnd.apply(res, arguments);
span.setAttributes(routeAttributes);
if (plugin._config.includeHttpAttributes) {
span.setAttributes(getHttpSpanAttributeFromRes(res));
}
span.setStatus(parseResponseStatus(res.statusCode!));
const newSpanName = getSpanNameOnResEnd(req, routeState);
if (newSpanName) {
span.updateName(newSpanName);
}
span.end();
};
next();
});
}
private getApplicationLazyRouterPatch(moduleVersion: string, original: () => void) {
const self = this;
return function patchedLazyRouter() {
const origRes = original.apply(this, arguments);
if (!isWrapped(this._router.handle)) {
self._wrap(this._router, 'handle', self.getAppRouterHandlerPatch.bind(self));
self.registerInstrumentationMiddleware(this, moduleVersion);
}
return origRes;
};
}
private getAppRouterHandlerPatch(
original: (req: express.Request, res: express.Response, next: express.NextFunction) => void
) {
const self = this;
return function patchedAppRouterHandle(
req: express.Request,
res: express.Response,
next: express.NextFunction
) {
// check that this is indeed the entry point to the app
// we will hit this if on mounted app cases
const currentState = self.getCurrentRouteState(req);
if (currentState) {
return original.apply(this, arguments);
}
const initialState: ExpressConsumedRouteState = createInitialRouteState(req);
const patchedNext = function (err?: any) {
return self.runMiddlewareWithContext({ ...initialState, isUnhandled: true }, req, () => next(err));
};
return self.runMiddlewareWithContext(initialState, req, () => original.call(this, req, res, patchedNext));
};
}
private getRoutingStateOnConsumingPath(
req: express.Request,
layer: ExpressLayer
): { origState: ExpressConsumedRouteState; newState: ExpressConsumedRouteState } {
const origContext = context.active();
const currentState = this.getCurrentRouteState(req);
// we must have an express context at this point (which was create at app router)
// if we don't, than this is an error
if (!currentState) {
const errorState = {
errors: ['internal error in express instrumentation: missing route context'],
};
return { origState: errorState, newState: errorState };
}
const currentLayerPath: LayerPath = layer[PATH_STORE];
const newExpressRouteState = consumeLayerPathAndUpdateState(currentState, req, currentLayerPath);
return { origState: currentState, newState: newExpressRouteState };
}
// we would like to rely on otel context which propagate correctly via async calls.
// the issue is that sometimes this mechanism fails due to timers / event emitters / thenables etc.
// then we get just an empty context which we cannot extract route from.
// so we install a fallback which should cover most cases - save the route state on the request as well,
// and restore it when the middleware is done
private runMiddlewareWithContext(
consumeRouteState: ExpressConsumedRouteState,
req: PatchedRequest,
fn: (...args: unknown[]) => unknown
) {
Object.defineProperty(req, CONSUMED_ROUTE_STATE, {
value: consumeRouteState,
enumerable: false,
configurable: true,
});
const middlewareResult = context.with(context.active().setValue(CONSUMED_ROUTE_STATE, consumeRouteState), fn);
return middlewareResult;
}
private getCurrentRouteState(req: PatchedRequest): ExpressConsumedRouteState {
return (
(context.active().getValue(CONSUMED_ROUTE_STATE) as ExpressConsumedRouteState) ?? req[CONSUMED_ROUTE_STATE]
);
}
}