Skip to content

Commit

Permalink
lib: add tracing channel to diagnostics_channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Qard committed Dec 15, 2022
1 parent 22c645d commit 1215d64
Show file tree
Hide file tree
Showing 8 changed files with 1,074 additions and 21 deletions.
560 changes: 560 additions & 0 deletions doc/api/diagnostics_channel.md

Large diffs are not rendered by default.

271 changes: 250 additions & 21 deletions lib/diagnostics_channel.js
@@ -1,12 +1,18 @@
'use strict';

const {
ArrayPrototypeAt,
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
FunctionPrototypeBind,
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
PromisePrototypeThen,
PromiseReject,
ReflectApply,
SafeMap,
SymbolHasInstance,
} = primordials;

Expand All @@ -23,11 +29,44 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = internalBinding('util');

function decRef(channel) {
channel._weak.decRef();
if (channel._weak.getRef() === 0) {
delete channels[channel.name];
}
}

function markActive(channel) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
channel._subscribers = [];
channel._stores = new SafeMap();
}

function maybeMarkInactive(channel) {
// When there are no more active subscribers, restore to fast prototype.
if (!channel._subscribers.length && !channel._stores.size) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, Channel.prototype);
channel._subscribers = undefined;
channel._stores = undefined;
}
}

function defaultTransform(data) {
return data
}

function wrapStoreRun(store, data, next, transform = defaultTransform) {
return () => store.run(transform(data), next);
}

// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');
ArrayPrototypePush(this._subscribers, subscription);
this._weak.incRef();
}

unsubscribe(subscription) {
Expand All @@ -36,12 +75,28 @@ class ActiveChannel {

ArrayPrototypeSplice(this._subscribers, index, 1);

// When there are no more active subscribers, restore to fast prototype.
if (!this._subscribers.length) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(this, Channel.prototype);
decRef(this);
maybeMarkInactive(this);

return true;
}

bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) this._weak.incRef();
this._stores.set(store, transform);
}

unbindStore(store) {
if (!this._stores.has(store)) {
return false;
}

this._stores.delete(store);

decRef(this);
maybeMarkInactive(this);

return true;
}

Expand All @@ -61,11 +116,28 @@ class ActiveChannel {
}
}
}

runStores(data, fn, thisArg, ...args) {
this.publish(data);

// Bind base fn first due to AsyncLocalStorage.run not having thisArg
fn = FunctionPrototypeBind(fn, thisArg, ...args);

for (const entry of this._stores.entries()) {
const store = entry[0];
const transform = entry[1];
fn = wrapStoreRun(store, data, fn, transform);
}

return fn();
}
}

class Channel {
constructor(name) {
this._subscribers = undefined;
this._stores = undefined;
this._weak = undefined;
this.name = name;
}

Expand All @@ -76,20 +148,32 @@ class Channel {
}

subscribe(subscription) {
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
this._subscribers = [];
markActive(this);
this.subscribe(subscription);
}

unsubscribe() {
return false;
}

bindStore(store, transform) {
markActive(this);
this.bindStore(store, transform);
}

unbindStore() {
return false;
}

get hasSubscribers() {
return false;
}

publish() {}

runStores(data, fn, thisArg, ...args) {
return ReflectApply(fn, thisArg, args);
}
}

const channels = ObjectCreate(null);
Expand All @@ -105,27 +189,17 @@ function channel(name) {
}

channel = new Channel(name);
channels[name] = new WeakReference(channel);
channel._weak = new WeakReference(channel);
channels[name] = channel._weak;
return channel;
}

function subscribe(name, subscription) {
const chan = channel(name);
channels[name].incRef();
chan.subscribe(subscription);
return channel(name).subscribe(subscription);
}

function unsubscribe(name, subscription) {
const chan = channel(name);
if (!chan.unsubscribe(subscription)) {
return false;
}

channels[name].decRef();
if (channels[name].getRef() === 0) {
delete channels[name];
}
return true;
return channel(name).unsubscribe(subscription);
}

function hasSubscribers(name) {
Expand All @@ -139,10 +213,165 @@ function hasSubscribers(name) {
return channel.hasSubscribers;
}

const traceEvents = [
'start',
'end',
'asyncStart',
'asyncEnd',
'error',
];

function assertChannel(value, name) {
if (!(value instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
}
}

class TracingChannel {
constructor(nameOrChannels) {
if (typeof nameOrChannels === 'string') {
this.start = channel(`tracing:${nameOrChannels}:start`);
this.end = channel(`tracing:${nameOrChannels}:end`);
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
this.error = channel(`tracing:${nameOrChannels}:error`);
} else if (typeof nameOrChannels === 'object') {
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;

assertChannel(start, 'nameOrChannels.start');
assertChannel(end, 'nameOrChannels.end');
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
assertChannel(error, 'nameOrChannels.error');

this.start = start;
this.end = end;
this.asyncStart = asyncStart;
this.asyncEnd = asyncEnd;
this.error = error;
} else {
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
['string', 'object', 'Channel'],
nameOrChannels);
}
}

subscribe(handlers) {
for (const name of traceEvents) {
if (!handlers[name]) continue;

this[name]?.subscribe(handlers[name]);
}
}

unsubscribe(handlers) {
let done = true;

for (const name of traceEvents) {
if (!handlers[name]) continue;

if (!this[name]?.unsubscribe(handlers[name])) {
done = false;
}
}

return done;
}

traceSync(fn, ctx = {}, thisArg, ...args) {
const { start, end, error } = this;

try {
const result = start.runStores(ctx, fn, thisArg, ...args);
ctx.result = result;
return result;
} catch (err) {
ctx.error = err;
error.publish(ctx);
throw err;
} finally {
end.publish(ctx);
}
}

tracePromise(fn, ctx = {}, thisArg, ...args) {
const { start, end, asyncStart, asyncEnd, error } = this;

function reject(err) {
ctx.error = err;
error.publish(ctx);
asyncStart.publish(ctx);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(ctx);
return PromiseReject(err);
}

function resolve(result) {
ctx.result = result;
asyncStart.publish(ctx);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(ctx);
return result;
}

try {
const promise = start.runStores(ctx, fn, thisArg, ...args);
return PromisePrototypeThen(promise, resolve, reject);
} catch (err) {
ctx.error = err;
error.publish(ctx);
throw err;
} finally {
end.publish(ctx);
}
}

traceCallback(fn, position = 0, ctx = {}, thisArg, ...args) {
const { start, end, asyncStart, asyncEnd, error } = this;

function wrappedCallback(err, res) {
if (err) {
ctx.error = err;
error.publish(ctx);
} else {
ctx.result = res;
}

asyncStart.publish(ctx);
try {
if (callback) {
return ReflectApply(callback, this, arguments);
}
} finally {
asyncEnd.publish(ctx);
}
}

const callback = ArrayPrototypeAt(args, position);
ArrayPrototypeSplice(args, position, 1, wrappedCallback);

try {
return start.runStores(ctx, fn, thisArg, ...args);
} catch (err) {
ctx.error = err;
error.publish(ctx);
throw err;
} finally {
end.publish(ctx);
}
}
}

function tracingChannel(nameOrChannels) {
return new TracingChannel(nameOrChannels);
}

module.exports = {
channel,
hasSubscribers,
subscribe,
tracingChannel,
unsubscribe,
Channel
Channel,
TracingChannel
};

0 comments on commit 1215d64

Please sign in to comment.