Skip to content

Commit

Permalink
lib: add span API
Browse files Browse the repository at this point in the history
  • Loading branch information
Qard committed Nov 9, 2020
1 parent 831f4c7 commit f2d16bb
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 1 deletion.
149 changes: 149 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Spans correlate a sequence of events
const span = channel.span({
some: 'start data'
});

// Annotations are for data not strictly related to the start or end events
span.annotate({
some: 'annotation'
});

// This end event will share an `id` with the span started above
span.end({
some: 'end data'
});
```

#### `diagnostics_channel.hasSubscribers(name)`
Expand Down Expand Up @@ -175,6 +190,140 @@ channel.subscribe(onMessage);
channel.unsubscribe(onMessage);
```

#### `channel.span(data)`

* `data` {any} The message to send as the start event of the span
* Returns: {Span} The span object

Create a span object to tie a sequence of events together. This wraps the input
data in a new object which includes an action type of `start`, `annotation`, or
`end` and a numeric id to link span messages together for later reconstitution.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});
```

### Class: `Span`

The class `Span` represents a series of messages to send to a named channel.
It is used to automatically apply a shared id to all messages sent through the
span enabling later reconstitution of the message sequence.

#### `span.id`

* Returns: {number} Span id

This is a process-unique identifier which allows all messages from this span
to be correlated.

#### `span.annotate(data)`

* `data` {any} The message to send as an annotation event of the span

Send an `annotation` event to the message sequence.

This is useful for augmenting the span with additional data before completion.
For example, a library may use a connection pool internally and want to report
the host name of the acquired connection to a span representing a higher-level
interaction.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});

span.annotate({
some: 'other message'
});
```

#### `span.end(data)`

* `data` {any} The message to send as the end event of the span

Send an `end` event representing the completion of the message sequence.

This will mark the span as complete. Any further attempts to send messages
through the span will be ignored.

```js
const diagnostics_channel = require('diagnostics_channel');

const channel = diagnostics_channel.channel('my-channel');

const span = channel.span({
some: 'message'
});

span.end({
some: 'other message'
});
```

#### `Span.aggregate(channel, [map, ] onSpan)`

* `channel` {Channel} Channel to listen to
* `map` {Function} Run for each span message
* `onSpan` {Function} Handler to receive a reconstituted span
* Returns: {Function} Function to stop the aggregate and unsubscribe

Reconstitutes span events on a channel into an array of events by listening to
and aggregating events from each span until its end event is reached.

```js
const diagnostics_channel = require('diagnostics_channel');
const { Span } = diagnostics_channel;

const channel = diagnostics_channel.channel('my-channel');

function map(spanMessage) {
spanMessage.ts = Date.now();
}

function onSpan(span) {
// Receives an array of span events whenver a span is ended.
}

Span.aggregate(channel, map, onSpan);
```

### Class: `SpanMessage`

A `SpanMessage` is a container type used to bind a shared id to each message
within a span and an action name to describe what is happening to the span.
The action name is one of

#### `spanMessage.id`

* Returns: {number} Span id

This is a process-unique identifier which allows span message events to be
correlated to a single span.

#### `spanMessage.action`

* Returns: {string} Action name

This describes what action was taken on the span. This may be one of: `start`,
`end`, and `annotate` which is used to report intermediate data not linked to
the `start` or `end`.

#### `spanMessage.data`

* Returns: {any} The message data

The `data` contains the message data provided through the `Span` interface.

[`diagnostics_channel.channel(name)`]: #diagnostics_channel_diagnostics_channel_channel_name
[`channel.subscribe(onMessage)`]: #diagnostics_channel_channel_subscribe_onmessage
[`'uncaughtException'`]: process.md#process_event_uncaughtexception
112 changes: 111 additions & 1 deletion lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
Map,
MapPrototypeDelete,
MapPrototypeGet,
MapPrototypeSet,
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Expand All @@ -21,6 +25,103 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = internalBinding('util');

function identity(v) {
return v;
}

class SpanMessage {
constructor(action, id, data) {
this.action = action;
this.id = id;
this.data = data;
}

// Augment with type: span to identify over the wire through JSON
toJSON() {
return {
...this,
type: 'span'
};
}
}

let spanId = 0;

class InactiveSpan {
annotate() {}
end() {}
}

class Span {
constructor(channel, data) {
this._channel = channel;
this.id = ++spanId;
this._ended = false;

this._message('start', data);
}

_message(action, data) {
if (this._ended) return;

if (action === 'end') {
this._ended = true;
}

this._channel.publish(new SpanMessage(action, this.id, data));
}

annotate(data) {
this._message('annotation', data);
}

end(data) {
this._message('end', data);
}

static aggregate(channel, map = identity, onComplete) {
const spans = new Map();

if (typeof onComplete !== 'function') {
onComplete = map;
map = identity;
}

// eslint-disable-next-line no-use-before-define
if (!(channel instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE('channel', ['Channel'], channel);
}
if (typeof onComplete !== 'function') {
throw new ERR_INVALID_ARG_TYPE('onComplete', ['function'], onComplete);
}

function onMessage(message) {
// A span message may be pre or post serialization so identify either
if (message instanceof SpanMessage || message.type === 'span') {
let messages = MapPrototypeGet(spans, message.id);
if (!messages) {
messages = [];
MapPrototypeSet(spans, message.id, messages);
}

ArrayPrototypePush(messages, map(message));

if (message.action === 'end') {
onComplete(messages);
MapPrototypeDelete(spans, message.id);
}
}
}

channel.subscribe(onMessage);

return () => {
spans.clear();
channel.unsubscribe(onMessage);
};
}
}

// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
Expand Down Expand Up @@ -60,6 +161,10 @@ class ActiveChannel {
}
}
}

span(data) {
return new Span(this, data);
}
}

class Channel {
Expand All @@ -85,6 +190,10 @@ class Channel {
}

publish() {}

span() {
return new InactiveSpan();
}
}

const channels = ObjectCreate(null);
Expand Down Expand Up @@ -118,5 +227,6 @@ function hasSubscribers(name) {
module.exports = {
channel,
hasSubscribers,
Channel
Channel,
Span
};
77 changes: 77 additions & 0 deletions test/parallel/test-diagnostics-channel-span-aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const inputs = [
{ test: 'start' },
{ test: 'annotation' },
{ test: 'end' }
];

const channel = dc.channel('test');

const receivedSpans = [];

const map = common.mustCall((message) => {
message.ts = process.hrtime.bigint();
return message;
}, 7);

const onComplete = common.mustCall((span) => {
receivedSpans.push(span);
}, 3);

dc.Span.aggregate(channel, map, onComplete);

let expectedMessage = inputs.shift();
const spanA = channel.span(expectedMessage);
const spanB = channel.span(expectedMessage);
const spanC = channel.span(expectedMessage);

expectedMessage = inputs.shift();
spanA.annotate(expectedMessage);

expectedMessage = inputs.shift();
spanB.end(expectedMessage);
spanC.end(expectedMessage);
spanA.end(expectedMessage);

const [ receivedB, receivedC, receivedA ] = receivedSpans;

// Spans received in order of completion
for (const span of receivedB) {
assert.strictEqual(span.id, 2);
}
for (const span of receivedC) {
assert.strictEqual(span.id, 3);
}
for (const span of receivedA) {
assert.strictEqual(span.id, 1);
}

// A starts before B which starts before C
assert.ok(receivedA[0].ts < receivedB[0].ts);
assert.ok(receivedB[0].ts < receivedC[0].ts);

// B ends before C which ends before A
assert.ok(receivedA[2].ts > receivedC[1].ts);
assert.ok(receivedC[1].ts > receivedB[1].ts);

// Middle event of last span is an annotation
const [, annotation] = receivedA;
assert.strictEqual(annotation.action, 'annotation');

for (const spans of receivedSpans) {
const [ start, ...rest ] = spans;
const end = rest.pop();

// First and last messages are start and end
assert.strictEqual(start.action, 'start');
assert.strictEqual(end.action, 'end');

// Annotation time occurs between all starts and ends
assert.ok(annotation.ts > start.ts);
assert.ok(annotation.ts < end.ts);
}

0 comments on commit f2d16bb

Please sign in to comment.