Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: add span API to diagnostics_channel #35534

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
149 changes: 149 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Spans correlate a sequence of events
const span = channel.span({
some: 'start data'
});

// Annotations are for data not strictly related to the start or end events
span.annotate({
some: 'annotation'
});

// This end event will share an `id` with the span started above
span.end({
some: 'end data'
});
```

#### `diagnostics_channel.hasSubscribers(name)`
Expand Down Expand Up @@ -175,6 +190,140 @@ channel.subscribe(onMessage);
channel.unsubscribe(onMessage);
```

#### `channel.span(data)`

* `data` {any} The message to send as the start event of the span
* Returns: {Span} The span object

Create a span object to tie a sequence of events together. This wraps the input
data in a new object which includes an action type of `start`, `annotation`, or
`end` and a numeric id to link span messages together for later reconstitution.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});
```

### Class: `Span`

The class `Span` represents a series of messages to send to a named channel.
It is used to automatically apply a shared id to all messages sent through the
span enabling later reconstitution of the message sequence.

#### `span.id`

* Returns: {number} Span id

This is a process-unique identifier which allows all messages from this span
to be correlated.

#### `span.annotate(data)`

* `data` {any} The message to send as an annotation event of the span

Send an `annotation` event to the message sequence.

This is useful for augmenting the span with additional data before completion.
For example, a library may use a connection pool internally and want to report
the host name of the acquired connection to a span representing a higher-level
interaction.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});

span.annotate({
some: 'other message'
});
```

#### `span.end(data)`

* `data` {any} The message to send as the end event of the span

Send an `end` event representing the completion of the message sequence.

This will mark the span as complete. Any further attempts to send messages
through the span will be ignored.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});

span.end({
some: 'other message'
});
```

#### `Span.aggregate(channel, [map, ] onSpan)`

* `channel` {Channel} Channel to listen to
* `map` {Function} Run for each span message
* `onSpan` {Function} Handler to receive a reconstituted span
* Returns: {Function} Function to stop the aggregate and unsubscribe

Reconstitutes span events on a channel into an array of events by listening to
and aggregating events from each span until its end event is reached.

```js
const diagnostics_channel = require('diagnostics_channel');
const { Span } = diagnostics_channel;

const channel = diagnostics_channel.channel('my-channel');

function map(spanMessage) {
spanMessage.ts = Date.now();
}

function onSpan(span) {
// Receives an array of span events whenver a span is ended.
}

Span.aggregate(channel, map, onSpan);
```

### Class: `SpanMessage`

A `SpanMessage` is a container type used to bind a shared id to each message
within a span and an action name to describe what is happening to the span.
The action name is one of

#### `spanMessage.id`

* Returns: {number} Span id

This is a process-unique identifier which allows span message events to be
correlated to a single span.

#### `spanMessage.action`

* Returns: {string} Action name

This describes what action was taken on the span. This may be one of: `start`,
`end`, and `annotate` which is used to report intermediate data not linked to
the `start` or `end`.

#### `spanMessage.data`

* Returns: {any} The message data

The `data` contains the message data provided through the `Span` interface.

[`diagnostics_channel.channel(name)`]: #diagnostics_channel_diagnostics_channel_channel_name
[`channel.subscribe(onMessage)`]: #diagnostics_channel_channel_subscribe_onmessage
[`'uncaughtException'`]: process.md#process_event_uncaughtexception
112 changes: 111 additions & 1 deletion lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
Map,
MapPrototypeDelete,
MapPrototypeGet,
MapPrototypeSet,
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Expand All @@ -21,6 +25,103 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = internalBinding('util');

function identity(v) {
return v;
}

class SpanMessage {
constructor(action, id, data) {
this.action = action;
this.id = id;
this.data = data;
}

// Augment with type: span to identify over the wire through JSON
toJSON() {
return {
...this,
type: 'span'
};
}
}

let spanId = 0;

class InactiveSpan {
annotate() {}
end() {}
}

class Span {
constructor(channel, data) {
this._channel = channel;
this.id = ++spanId;
this._ended = false;

this._message('start', data);
}

_message(action, data) {
if (this._ended) return;

if (action === 'end') {
this._ended = true;
}

this._channel.publish(new SpanMessage(action, this.id, data));
}

annotate(data) {
this._message('annotation', data);
}

end(data) {
this._message('end', data);
}

static aggregate(channel, map = identity, onComplete) {
const spans = new Map();

if (typeof onComplete !== 'function') {
onComplete = map;
map = identity;
}

// eslint-disable-next-line no-use-before-define
if (!(channel instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE('channel', ['Channel'], channel);
}
if (typeof onComplete !== 'function') {
throw new ERR_INVALID_ARG_TYPE('onComplete', ['function'], onComplete);
}

function onMessage(message) {
// A span message may be pre or post serialization so identify either
if (message instanceof SpanMessage || message.type === 'span') {
let messages = MapPrototypeGet(spans, message.id);
if (!messages) {
messages = [];
MapPrototypeSet(spans, message.id, messages);
}

ArrayPrototypePush(messages, map(message));

if (message.action === 'end') {
onComplete(messages);
MapPrototypeDelete(spans, message.id);
}
}
}

channel.subscribe(onMessage);

return () => {
spans.clear();
channel.unsubscribe(onMessage);
};
}
}

// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
Expand Down Expand Up @@ -60,6 +161,10 @@ class ActiveChannel {
}
}
}

span(data) {
return new Span(this, data);
}
}

class Channel {
Expand All @@ -85,6 +190,10 @@ class Channel {
}

publish() {}

span() {
return new InactiveSpan();
}
}

const channels = ObjectCreate(null);
Expand Down Expand Up @@ -118,5 +227,6 @@ function hasSubscribers(name) {
module.exports = {
channel,
hasSubscribers,
Channel
Channel,
Span
};
77 changes: 77 additions & 0 deletions test/parallel/test-diagnostics-channel-span-aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const inputs = [
{ test: 'start' },
{ test: 'annotation' },
{ test: 'end' }
];

const channel = dc.channel('test');

const receivedSpans = [];

const map = common.mustCall((message) => {
message.ts = process.hrtime.bigint();
return message;
}, 7);

const onComplete = common.mustCall((span) => {
receivedSpans.push(span);
}, 3);

dc.Span.aggregate(channel, map, onComplete);

let expectedMessage = inputs.shift();
const spanA = channel.span(expectedMessage);
const spanB = channel.span(expectedMessage);
const spanC = channel.span(expectedMessage);

expectedMessage = inputs.shift();
spanA.annotate(expectedMessage);

expectedMessage = inputs.shift();
spanB.end(expectedMessage);
spanC.end(expectedMessage);
spanA.end(expectedMessage);

const [ receivedB, receivedC, receivedA ] = receivedSpans;

// Spans received in order of completion
for (const span of receivedB) {
assert.strictEqual(span.id, 2);
}
for (const span of receivedC) {
assert.strictEqual(span.id, 3);
}
for (const span of receivedA) {
assert.strictEqual(span.id, 1);
}

// A starts before B which starts before C
assert.ok(receivedA[0].ts < receivedB[0].ts);
assert.ok(receivedB[0].ts < receivedC[0].ts);

// B ends before C which ends before A
assert.ok(receivedA[2].ts > receivedC[1].ts);
assert.ok(receivedC[1].ts > receivedB[1].ts);

// Middle event of last span is an annotation
const [, annotation] = receivedA;
assert.strictEqual(annotation.action, 'annotation');

for (const spans of receivedSpans) {
const [ start, ...rest ] = spans;
const end = rest.pop();

// First and last messages are start and end
assert.strictEqual(start.action, 'start');
assert.strictEqual(end.action, 'end');

// Annotation time occurs between all starts and ends
assert.ok(annotation.ts > start.ts);
assert.ok(annotation.ts < end.ts);
}