From 60ef53cc1412300165c32210284798d213852cc8 Mon Sep 17 00:00:00 2001 From: Stephen Belanger Date: Fri, 21 Aug 2020 16:25:39 -0700 Subject: [PATCH] lib: create diagnostics_channel module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-URL: https://github.com/nodejs/node/pull/34895 Reviewed-By: Bryan English Reviewed-By: Gerhard Stöbich Reviewed-By: Vladimir de Turckheim Reviewed-By: Rich Trott Reviewed-By: Gabriel Schulhof Reviewed-By: Michael Dawson --- benchmark/diagnostics_channel/publish.js | 29 +++ benchmark/diagnostics_channel/subscribe.js | 19 ++ doc/api/diagnostics_channel.md | 180 ++++++++++++++++++ doc/api/index.md | 1 + lib/diagnostics_channel.js | 122 ++++++++++++ node.gyp | 1 + ...gnostics-channel-object-channel-pub-sub.js | 39 ++++ ...gnostics-channel-safe-subscriber-errors.js | 29 +++ .../test-diagnostics-channel-symbol-named.js | 22 +++ tools/doc/type-parser.js | 2 + 10 files changed, 444 insertions(+) create mode 100644 benchmark/diagnostics_channel/publish.js create mode 100644 benchmark/diagnostics_channel/subscribe.js create mode 100644 doc/api/diagnostics_channel.md create mode 100644 lib/diagnostics_channel.js create mode 100644 test/parallel/test-diagnostics-channel-object-channel-pub-sub.js create mode 100644 test/parallel/test-diagnostics-channel-safe-subscriber-errors.js create mode 100644 test/parallel/test-diagnostics-channel-symbol-named.js diff --git a/benchmark/diagnostics_channel/publish.js b/benchmark/diagnostics_channel/publish.js new file mode 100644 index 00000000000000..31a770c8627919 --- /dev/null +++ b/benchmark/diagnostics_channel/publish.js @@ -0,0 +1,29 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); + +const bench = common.createBenchmark(main, { + n: [1e8], + subscribers: [0, 1, 10], +}); + +function noop() {} + +function main({ n, subscribers }) { + const channel = dc.channel('test'); + for (let i = 0; i < subscribers; i++) { + channel.subscribe(noop); + } + + const data = { + foo: 'bar' + }; + + bench.start(); + for (let i = 0; i < n; i++) { + if (channel.hasSubscribers) { + channel.publish(data); + } + } + bench.end(n); +} diff --git a/benchmark/diagnostics_channel/subscribe.js b/benchmark/diagnostics_channel/subscribe.js new file mode 100644 index 00000000000000..1415054588c4b1 --- /dev/null +++ b/benchmark/diagnostics_channel/subscribe.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); + +const bench = common.createBenchmark(main, { + n: [1e8], +}); + +function noop() {} + +function main({ n }) { + const channel = dc.channel('channel.0'); + + bench.start(); + for (let i = 0; i < n; i++) { + channel.subscribe(noop); + } + bench.end(n); +} diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md new file mode 100644 index 00000000000000..2f78ea80f6775e --- /dev/null +++ b/doc/api/diagnostics_channel.md @@ -0,0 +1,180 @@ +# Diagnostics Channel + + + +> Stability: 1 - Experimental + + + +The `diagnostics_channel` module provides an API to create named channels +to report arbitrary message data for diagnostics purposes. + +It can be accessed using: + +```js +const diagnostics_channel = require('diagnostics_channel'); +``` + +It is intended that a module writer wanting to report diagnostics messages +will create one or many top-level channels to report messages through. +Channels may also be acquired at runtime but it is not encouraged +due to the additional overhead of doing so. Channels may be exported for +convenience, but as long as the name is known it can be acquired anywhere. + +If you intend for your module to produce diagnostics data for others to +consume it is recommended that you include documentation of what named +channels are used along with the shape of the message data. Channel names +should generally include the module name to avoid collisions with data from +other modules. + +## Public API + +### Overview + +Following is a simple overview of the public API. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +// Get a reusable channel object +const channel = diagnostics_channel.channel('my-channel'); + +// Subscribe to the channel +channel.subscribe((message, name) => { + // Received data +}); + +// Check if the channel has an active subscriber +if (channel.hasSubscribers) { + // Publish data to the channel + channel.publish({ + some: 'data' + }); +} +``` + +#### `diagnostics_channel.hasSubscribers(name)` + +* `name` {string|symbol} The channel name +* Returns: {boolean} If there are active subscribers + +Check if there are active subscribers to the named channel. This is helpful if +the message you want to send might be expensive to prepare. + +This API is optional but helpful when trying to publish messages from very +performance-senstive code. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +if (diagnostics_channel.hasSubscribers('my-channel')) { + // There are subscribers, prepare and publish message +} +``` + +#### `diagnostics_channel.channel(name)` + +* `name` {string|symbol} The channel name +* Returns: {Channel} The named channel object + +This is the primary entry-point for anyone wanting to interact with a named +channel. It produces a channel object which is optimized to reduce overhead at +publish time as much as possible. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); +``` + +### Class: `Channel` + +The class `Channel` represents an individual named channel within the data +pipeline. It is use to track subscribers and to publish messages when there +are subscribers present. It exists as a separate object to avoid channel +lookups at publish time, enabling very fast publish speeds and allowing +for heavy use while incurring very minimal cost. Channels are created with +[`diagnostics_channel.channel(name)`][], constructing a channel directly +with `new Channel(name)` is not supported. + +#### `channel.hasSubscribers` + +* Returns: {boolean} If there are active subscribers + +Check if there are active subscribers to this channel. This is helpful if +the message you want to send might be expensive to prepare. + +This API is optional but helpful when trying to publish messages from very +performance-senstive code. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +if (channel.hasSubscribers) { + // There are subscribers, prepare and publish message +} +``` + +#### `channel.publish(message)` + +* `message` {any} The message to send to the channel subscribers + +Publish a message to any subscribers to the channel. This will trigger +message handlers synchronously so they will execute within the same context. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.publish({ + some: 'message' +}); +``` + +#### `channel.subscribe(onMessage)` + +* `onMessage` {Function} The handler to receive channel messages + * `message` {any} The message data + * `name` {string|symbol} The name of the channel + +Register a message handler to subscribe to this channel. This message handler +will be run synchronously whenever a message is published to the channel. Any +errors thrown in the message handler will trigger an [`'uncaughtException'`][]. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.subscribe((message, name) => { + // Received data +}); +``` + +#### `channel.unsubscribe(onMessage)` + +* `onMessage` {Function} The previous subscribed handler to remove + +Remove a message handler previously registered to this channel with +[`channel.subscribe(onMessage)`][]. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +function onMessage(message, name) { + // Received data +} + +channel.subscribe(onMessage); + +channel.unsubscribe(onMessage); +``` + +[`diagnostics_channel.channel(name)`]: #diagnostics_channel_diagnostics_channel_channel_name +[`channel.subscribe(onMessage)`]: #diagnostics_channel_channel_subscribe_onmessage +[`'uncaughtException'`]: process.md#process_event_uncaughtexception diff --git a/doc/api/index.md b/doc/api/index.md index e32a207c310256..393110e134cad7 100644 --- a/doc/api/index.md +++ b/doc/api/index.md @@ -23,6 +23,7 @@ * [Crypto](crypto.md) * [Debugger](debugger.md) * [Deprecated APIs](deprecations.md) +* [Diagnostics Channel](diagnostics_channel.md) * [DNS](dns.md) * [Domain](domain.md) * [Errors](errors.md) diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js new file mode 100644 index 00000000000000..0a3552dc975040 --- /dev/null +++ b/lib/diagnostics_channel.js @@ -0,0 +1,122 @@ +'use strict'; + +const { + ArrayPrototypeIndexOf, + ArrayPrototypePush, + ArrayPrototypeSplice, + ObjectCreate, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + SymbolHasInstance, + WeakRefPrototypeGet +} = primordials; + +const { + codes: { + ERR_INVALID_ARG_TYPE, + } +} = require('internal/errors'); + +const { triggerUncaughtException } = internalBinding('errors'); + +const { WeakReference } = internalBinding('util'); + +// TODO(qard): should there be a C++ channel interface? +class ActiveChannel { + subscribe(subscription) { + if (typeof subscription !== 'function') { + throw new ERR_INVALID_ARG_TYPE('subscription', ['function'], + subscription); + } + ArrayPrototypePush(this._subscribers, subscription); + } + + unsubscribe(subscription) { + const index = ArrayPrototypeIndexOf(this._subscribers, subscription); + if (index >= 0) { + ArrayPrototypeSplice(this._subscribers, index, 1); + + // When there are no more active subscribers, restore to fast prototype. + if (!this._subscribers.length) { + // eslint-disable-next-line no-use-before-define + ObjectSetPrototypeOf(this, Channel.prototype); + } + } + } + + get hasSubscribers() { + return true; + } + + publish(data) { + for (let i = 0; i < this._subscribers.length; i++) { + try { + const onMessage = this._subscribers[i]; + onMessage(data, this.name); + } catch (err) { + process.nextTick(() => { + triggerUncaughtException(err, false); + }); + } + } + } +} + +class Channel { + constructor(name) { + this._subscribers = undefined; + this.name = name; + } + + static [SymbolHasInstance](instance) { + const prototype = ObjectGetPrototypeOf(instance); + return prototype === Channel.prototype || + prototype === ActiveChannel.prototype; + } + + subscribe(subscription) { + ObjectSetPrototypeOf(this, ActiveChannel.prototype); + this._subscribers = []; + this.subscribe(subscription); + } + + get hasSubscribers() { + return false; + } + + publish() {} +} + +const channels = ObjectCreate(null); + +function channel(name) { + let channel; + const ref = channels[name]; + if (ref) channel = ref.get(); + if (channel) return channel; + + if (typeof name !== 'string' && typeof name !== 'symbol') { + throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); + } + + channel = new Channel(name); + channels[name] = new WeakReference(channel); + return channel; +} + +function hasSubscribers(name) { + let channel; + const ref = channels[name]; + if (ref) channel = WeakRefPrototypeGet(ref); + if (!channel) { + return false; + } + + return channel.hasSubscribers; +} + +module.exports = { + channel, + hasSubscribers, + Channel +}; diff --git a/node.gyp b/node.gyp index d483f1c74f87de..69b4adf96cb7e9 100644 --- a/node.gyp +++ b/node.gyp @@ -47,6 +47,7 @@ 'lib/constants.js', 'lib/crypto.js', 'lib/cluster.js', + 'lib/diagnostics_channel.js', 'lib/dgram.js', 'lib/dns.js', 'lib/domain.js', diff --git a/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js b/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js new file mode 100644 index 00000000000000..bae69b02415785 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js @@ -0,0 +1,39 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); +const { Channel } = dc; + +const input = { + foo: 'bar' +}; + +// Should not have named channel +assert.ok(!dc.hasSubscribers('test')); + +// Individual channel objects can be created to avoid future lookups +const channel = dc.channel('test'); +assert.ok(channel instanceof Channel); + +// No subscribers yet, should not publish +assert.ok(!channel.hasSubscribers); + +const subscriber = common.mustCall((message, name) => { + assert.strictEqual(name, channel.name); + assert.deepStrictEqual(message, input); +}); + +// Now there's a subscriber, should publish +channel.subscribe(subscriber); +assert.ok(channel.hasSubscribers); + +// The ActiveChannel prototype swap should not fail instanceof +assert.ok(channel instanceof Channel); + +// Should trigger the subscriber once +channel.publish(input); + +// Should not publish after subscriber is unsubscribed +channel.unsubscribe(subscriber); +assert.ok(!channel.hasSubscribers); diff --git a/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js b/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js new file mode 100644 index 00000000000000..b0c5ab2480e374 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js @@ -0,0 +1,29 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const input = { + foo: 'bar' +}; + +const channel = dc.channel('fail'); + +const error = new Error('nope'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err, error); +})); + +channel.subscribe(common.mustCall((message, name) => { + throw error; +})); + +// The failing subscriber should not stop subsequent subscribers from running +channel.subscribe(common.mustCall()); + +// Publish should continue without throwing +const fn = common.mustCall(); +channel.publish(input); +fn(); diff --git a/test/parallel/test-diagnostics-channel-symbol-named.js b/test/parallel/test-diagnostics-channel-symbol-named.js new file mode 100644 index 00000000000000..b98c2a1ef3ec6c --- /dev/null +++ b/test/parallel/test-diagnostics-channel-symbol-named.js @@ -0,0 +1,22 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const input = { + foo: 'bar' +}; + +const symbol = Symbol('test'); + +// Individual channel objects can be created to avoid future lookups +const channel = dc.channel(symbol); + +// Expect two successful publishes later +channel.subscribe(common.mustCall((message, name) => { + assert.strictEqual(name, symbol); + assert.deepStrictEqual(message, input); +})); + +channel.publish(input); diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index a1c80211bcca25..657b187dad9bec 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -73,6 +73,8 @@ const customTypesMap = { 'dgram.Socket': 'dgram.html#dgram_class_dgram_socket', + 'Channel': 'diagnostics_channel.html#diagnostics_channel_class_channel', + 'Domain': 'domain.html#domain_class_domain', 'errors.Error': 'errors.html#errors_class_error',