Skip to content

Commit

Permalink
feat: add Azure SDK Patching Subscriber (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark Wolff committed Jun 18, 2020
1 parent 18691d3 commit 162d731
Show file tree
Hide file tree
Showing 28 changed files with 461 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Expand Up @@ -12,6 +12,8 @@ Tests/BackCompatibility/OldTSC/node_modules/**
Tests/BackCompatibility/Node10Types/node_modules/**
Tests/FunctionalTests/Runner/node_modules/**
Tests/FunctionalTests/TestApp/node_modules/**
Tests/FunctionalTests/Runner/package-lock.json
Tests/FunctionalTests/TestApp/package-lock.json

# Ignore Visual Studio files
*.suo
Expand All @@ -29,4 +31,3 @@ Tests/FunctionalTests/TestApp/node_modules/**

# Ignore log files
npm-debug.log

4 changes: 1 addition & 3 deletions .travis.yml
Expand Up @@ -7,13 +7,11 @@ env:
- TRAVIS_EXTENDED_METRICS=false
node_js:
- "node"
- "13"
- "12"
- "11"
- "10"
- "9"
- "8"
- "8.0.0"
- "7"
- "6"
- "4"
- "0.12"
Expand Down
108 changes: 108 additions & 0 deletions AutoCollection/AsyncHooksScopeManager.ts
@@ -0,0 +1,108 @@
import { CorrelationContextManager, CorrelationContext } from "./CorrelationContextManager"
import { ISpanContext } from "diagnostic-channel";
import { EventEmitter } from "events";

/**
* Type of span. Can be used to specify additional relationships between spans
* in addition to a parent/child relationship.
*/
export enum SpanKind {
/** Default value. Indicates that the span is used internally. */
INTERNAL = 0,

/**
* Indicates that the span covers server-side handling of an RPC or other
* remote request.
*/
SERVER = 1,

/**
* Indicates that the span covers the client-side wrapper around an RPC or
* other remote request.
*/
CLIENT = 2,

/**
* Indicates that the span describes producer sending a message to a
* broker. Unlike client and server, there is no direct critical path latency
* relationship between producer and consumer spans.
*/
PRODUCER = 3,

/**
* Indicates that the span describes consumer receiving a message from a
* broker. Unlike client and server, there is no direct critical path latency
* relationship between producer and consumer spans.
*/
CONSUMER = 4,
}

export interface Link {
/** The {@link SpanContext} of a linked span. */
spanContext: SpanContext;
/** A set of {@link Attributes} on the link. */
attributes?: Record<string, string>;
}

export interface SpanContext {
traceId: string;
spanId: string;
traceFlags?: { toString: () => string };
tracestate?: string;
}

export interface Span {
_duration: [number, number]; // hrTime
name: string;
parentSpanId?: string;
status: { code: number, message?: string },
attributes: Record<string, string>,
kind: SpanKind;
links: Link[];
context: () => SpanContext;
}

export class OpenTelemetryScopeManagerWrapper {
public active() {
const context = CorrelationContextManager.getCurrentContext() as any;
return { ...context, getValue: () => context, setValue: () => {} };
}

public with(span: Span, fn: () => any) {
const parentSpanId = span.parentSpanId;
const name = span.name;
const correlationContext = OpenTelemetryScopeManagerWrapper._spanToContext(span, parentSpanId, name);
return CorrelationContextManager.runWithContext(correlationContext, fn)();
}

public bind<T>(target: T): T {
if (typeof target === "function") {
return CorrelationContextManager.wrapCallback(target);
} else if (target instanceof EventEmitter) {
CorrelationContextManager.wrapEmitter(target);
}
return target;
}

public enable(): this {
CorrelationContextManager.enable();
return this;
}

public disable(): this {
CorrelationContextManager.disable();
return this;
}

private static _spanToContext(span: Span, parentSpanId?: string, name?: string): CorrelationContext {
const _parentId = parentSpanId ? `|${span.context().traceId}.${parentSpanId}.` : span.context().traceId;
const context: ISpanContext = {
...span.context(),
traceFlags: span.context().traceFlags.toString()
};
const correlationContext = CorrelationContextManager.spanToContextObject(context, _parentId, name)
return correlationContext;
}
}

export const AsyncScopeManager = new OpenTelemetryScopeManagerWrapper();
12 changes: 10 additions & 2 deletions AutoCollection/CorrelationContextManager.ts
@@ -1,6 +1,4 @@
import http = require("http");
import events = require("events");
import Util = require("../Library/Util");
import Logging = require("../Library/Logging");

import * as DiagChannel from "./diagnostic-channel/initialization";
Expand All @@ -9,6 +7,7 @@ import * as DiagChannel from "./diagnostic-channel/initialization";
import * as cls from "cls-hooked";
import Traceparent = require("../Library/Traceparent");
import Tracestate = require("../Library/Tracestate");
import { ISpanContext } from "diagnostic-channel";

export interface CustomProperties {
/**
Expand Down Expand Up @@ -91,6 +90,15 @@ export class CorrelationContextManager {
return null;
}

public static spanToContextObject(spanContext: ISpanContext, parentId?: string, name?: string) {
const traceContext = new Traceparent();
traceContext.traceId = spanContext.traceId;
traceContext.spanId = spanContext.spanId;
traceContext.traceFlag = spanContext.traceFlags || Traceparent.DEFAULT_TRACE_FLAG;
traceContext.parentId = parentId;
return CorrelationContextManager.generateContextObject(traceContext.traceId, traceContext.parentId, name, null, traceContext);
}

/**
* Runs a function inside a given Context.
* All logical children of the execution path that entered this Context
Expand Down
6 changes: 6 additions & 0 deletions AutoCollection/HttpDependencies.ts
Expand Up @@ -41,6 +41,7 @@ class AutoCollectHttpDependencies {
this._initialize();
}
if (DiagChannel.IsInitialized) {
require("./diagnostic-channel/azure-coretracing.sub").enable(true, this._client);
require("./diagnostic-channel/mongodb.sub").enable(isEnabled, this._client);
require("./diagnostic-channel/mysql.sub").enable(isEnabled, this._client);
require("./diagnostic-channel/redis.sub").enable(isEnabled, this._client);
Expand All @@ -63,6 +64,11 @@ class AutoCollectHttpDependencies {
var shouldCollect = !(<any>options)[AutoCollectHttpDependencies.disableCollectionRequestOption] &&
!(<any>request)[AutoCollectHttpDependencies.alreadyAutoCollectedFlag];

// If someone else patched traceparent headers onto this request
if (options.headers && options.headers['user-agent'] && options.headers['user-agent'].toString().indexOf('azsdk-js') !== -1) {
shouldCollect = false;
}

(<any>request)[AutoCollectHttpDependencies.alreadyAutoCollectedFlag] = true;

if (request && options && shouldCollect) {
Expand Down
79 changes: 79 additions & 0 deletions AutoCollection/diagnostic-channel/SpanParser.ts
@@ -0,0 +1,79 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
import { Span, SpanKind } from "../AsyncHooksScopeManager";
import * as Contracts from "../../Declarations/Contracts";
import * as Constants from "../../Declarations/Constants";

function filterSpanAttributes(attributes: Record<string, string>) {
const newAttributes = { ...attributes };
Object.keys(Constants.SpanAttribute).forEach(key => {
delete newAttributes[key];
});
return newAttributes
}

export function spanToTelemetryContract(span: Span): (Contracts.DependencyTelemetry & Contracts.RequestTelemetry) & Contracts.Identified {
const id = `|${span.context().traceId}.${span.context().spanId}.`;
const duration = Math.round(span._duration[0] * 1e3 + span._duration[1] / 1e6);
const isHttp: boolean = ((span.attributes.component || "").toUpperCase() === Constants.DependencyTypeName.Http) || (!!span.attributes[Constants.SpanAttribute.HttpUrl]);
const isGrpc: boolean = (span.attributes.component || "").toLowerCase() === Constants.DependencyTypeName.Grpc;
if (isHttp) {
// Read http span attributes
const method = span.attributes[Constants.SpanAttribute.HttpMethod] || "GET";
const url = new URL(span.attributes[Constants.SpanAttribute.HttpUrl]);
const host = span.attributes[Constants.SpanAttribute.HttpHost] || url.host;
const port = span.attributes[Constants.SpanAttribute.HttpPort] || url.port || null;
const pathname = url.pathname || "/";

// Translate to AI Dependency format
const name = `${method} ${pathname}`;
const dependencyTypeName = Constants.DependencyTypeName.Http;
const target = port ? `${host}:${port}` : host;
const data = url.toString();
const resultCode = span.attributes[Constants.SpanAttribute.HttpStatusCode] || span.status.code || 0;
const success = resultCode < 400; // Status.OK
return {
id, name, dependencyTypeName,
target, data,
success, duration,
url: data,
resultCode: String(resultCode),
properties: filterSpanAttributes(span.attributes)
};
} else if (isGrpc) {
const method = span.attributes[Constants.SpanAttribute.GrpcMethod] || "rpc";
const service = span.attributes[Constants.SpanAttribute.GrpcService];
const name = service ? `${method} ${service}` : span.name;
return {
id, duration, name,
target: service,
data: service || name,
url: service || name,
dependencyTypeName: Constants.DependencyTypeName.Grpc,
resultCode: String(span.status.code || 0),
success: span.status.code === 0,
properties: filterSpanAttributes(span.attributes),
}
} else {
const name = span.name;
const links = span.links && span.links.map(link => {
return {
operation_Id: link.spanContext.traceId,
id: link.spanContext.spanId
};
});
return {
id, duration, name,
target: span.attributes["peer.address"],
data: span.attributes["peer.address"] || name,
url: span.attributes["peer.address"] || name,
dependencyTypeName: span.kind === SpanKind.INTERNAL ? Constants.DependencyTypeName.InProc : (span.attributes.component || span.name),
resultCode: String(span.status.code || 0),
success: span.status.code === 0,
properties: {
...filterSpanAttributes(span.attributes),
"_MS.links": links || undefined
},
};
}
}
48 changes: 48 additions & 0 deletions AutoCollection/diagnostic-channel/azure-coretracing.sub.ts
@@ -0,0 +1,48 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
import TelemetryClient = require("../../Library/TelemetryClient");
import { channel, IStandardEvent } from "diagnostic-channel";

import Traceparent = require("../../Library/Traceparent");
import * as SpanParser from "./SpanParser";
import { Span, AsyncScopeManager, SpanKind } from "../AsyncHooksScopeManager";

let clients: TelemetryClient[] = [];

export const subscriber = (event: IStandardEvent<Span>) => {
const span = event.data;
const telemetry = SpanParser.spanToTelemetryContract(span);
const spanContext = span.context();
const traceparent = new Traceparent();
traceparent.traceId = spanContext.traceId;
traceparent.spanId = spanContext.spanId;
traceparent.traceFlag = spanContext.traceFlags.toString();
traceparent.parentId = span.parentSpanId ? `|${spanContext.traceId}.${span.parentSpanId}.` : null;

AsyncScopeManager.with(span, () => {
clients.forEach((client) => {
if (span.kind === SpanKind.SERVER) {
// Server or Consumer
client.trackRequest(telemetry);
} else if (span.kind === SpanKind.CLIENT || span.kind === SpanKind.INTERNAL) {
// Client or Producer or Internal
client.trackDependency(telemetry);
}
// else - ignore producer/consumer spans for now until it is clear how this sdk should interpret them
});
});
};

export function enable(enabled: boolean, client: TelemetryClient) {
if (enabled) {
if (clients.length === 0) {
channel.subscribe<any>("azure-coretracing", subscriber);
};
clients.push(client);
} else {
clients = clients.filter((c) => c != client);
if (clients.length === 0) {
channel.unsubscribe("azure-coretracing", subscriber);
}
}
}
9 changes: 6 additions & 3 deletions AutoCollection/diagnostic-channel/initialization.ts
Expand Up @@ -5,6 +5,7 @@
// This is to avoid requiring the actual module if the NO_DIAGNOSTIC_CHANNEL env is present
import * as DiagChannelPublishers from "diagnostic-channel-publishers";
import * as DiagChannel from "diagnostic-channel";
import { AsyncScopeManager } from "../AsyncHooksScopeManager";
import Logging = require("../../Library/Logging");

export const IsInitialized = !process.env["APPLICATION_INSIGHTS_NO_DIAGNOSTIC_CHANNEL"];
Expand All @@ -23,7 +24,8 @@ if (IsInitialized) {
redis: publishers.redis,
pg: publishers.pg,
pgPool: publishers.pgPool,
winston: publishers.winston
winston: publishers.winston,
azuresdk: publishers.azuresdk
};
for (const mod in modules) {
if (unpatchedModules.indexOf(mod) === -1) {
Expand All @@ -42,6 +44,7 @@ export function registerContextPreservation(cb: (cb: Function) => Function) {
if (!IsInitialized) {
return;
}

(require("diagnostic-channel") as typeof DiagChannel).channel.addContextPreservation(cb);
const diagChannel = (require("diagnostic-channel") as typeof DiagChannel);
diagChannel.channel.addContextPreservation(cb);
diagChannel.channel.spanContextPropagator = AsyncScopeManager;
}
21 changes: 21 additions & 0 deletions Declarations/Constants.ts
Expand Up @@ -112,3 +112,24 @@ export const TelemetryTypeStringToQuickPulseDocumentType: {[key in Contracts.Tel
AvailabilityData: QuickPulseDocumentType.Availability,
PageViewData: QuickPulseDocumentType.PageView
};

// OpenTelemetry Span Attributes
export const SpanAttribute = {
// HTTP
HttpHost: "http.host",
HttpMethod: "http.method",
HttpPort: "http.port",
HttpStatusCode: "http.status_code",
HttpUrl: "http.url",
HttpUserAgent: "http.user_agent",

// GRPC
GrpcMethod: "grpc.method",
GrpcService: "rpc.service", // rpc not grpc
};

export const DependencyTypeName = {
Grpc: "GRPC",
Http: "HTTP",
InProc: "InProc",
}
4 changes: 2 additions & 2 deletions Declarations/Contracts/TelemetryTypes/Telemetry.ts
Expand Up @@ -9,7 +9,7 @@ export interface Telemetry {
/**
* Additional data used to filter events and metrics in the portal. Defaults to empty.
*/
properties?: { [key: string]: string; };
properties?: { [key: string]: any; };
/**
* An event-specific context that will be passed to telemetry processors handling this event before it is sent. For a context spanning your entire operation, consider appInsights.getCorrelationContext
*/
Expand All @@ -18,4 +18,4 @@ export interface Telemetry {
* The context tags to use for this telemetry which overwrite default context values
*/
tagOverrides?: { [key: string]: string; };
}
}
2 changes: 1 addition & 1 deletion Library/CorrelationIdManager.ts
Expand Up @@ -10,7 +10,7 @@ class CorrelationIdManager {
private static TAG = "CorrelationIdManager";
public static correlationIdPrefix = "cid-v1:";

public static w3cEnabled = false;
public static w3cEnabled = true;

// To avoid extraneous HTTP requests, we maintain a queue of callbacks waiting on a particular appId lookup,
// as well as a cache of completed lookups so future requests can be resolved immediately.
Expand Down

0 comments on commit 162d731

Please sign in to comment.