From 13925219fa7893afd78c22545dc4fb53fd7cf2bb Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 1 Dec 2021 08:18:37 -0800 Subject: [PATCH] lib: make AbortSignal cloneable/transferable Allows for using `AbortSignal` across worker threads and contexts. ```js const ac = new AbortController(); const mc = new MessageChannel(); mc.port1.onmessage = ({ data }) => { data.addEventListener('abort', () => { console.log('aborted!'); }); }; mc.port2.postMessage(ac.signal, [ac.signal]); ``` Signed-off-by: James M Snell --- lib/internal/abort_controller.js | 96 +++++++++++++++++++-- test/parallel/test-abortsignal-cloneable.js | 67 ++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-abortsignal-cloneable.js diff --git a/lib/internal/abort_controller.js b/lib/internal/abort_controller.js index 4f6cbbf7325a58..7854644dca75bf 100644 --- a/lib/internal/abort_controller.js +++ b/lib/internal/abort_controller.js @@ -47,13 +47,37 @@ const { setTimeout, } = require('timers'); -const kAborted = Symbol('kAborted'); -const kReason = Symbol('kReason'); -const kTimeout = Symbol('kTimeout'); +const { + messaging_deserialize_symbol: kDeserialize, + messaging_transfer_symbol: kTransfer, + messaging_transfer_list_symbol: kTransferList +} = internalBinding('symbols'); -const timeOutSignals = new SafeSet(); +let _MessageChannel; +let makeTransferable; + +// Loading the MessageChannel and makeTransferable have to be done lazily +// because otherwise we'll end up with a require cycle that ends up with +// an incomplete initialization of abort_controller. + +function lazyMessageChannel() { + _MessageChannel ??= require('internal/worker/io').MessageChannel; + return new _MessageChannel(); +} + +function lazMakeTransferable(obj) { + makeTransferable ??= + require('internal/worker/js_transferable').makeTransferable; + return makeTransferable(obj); +} const clearTimeoutRegistry = new SafeFinalizationRegistry(clearTimeout); +const timeOutSignals = new SafeSet(); + +const kAborted = Symbol('kAborted'); +const kReason = Symbol('kReason'); +const kCloneData = Symbol('kCloneData'); +const kTimeout = Symbol('kTimeout'); function customInspect(self, obj, depth, options) { if (depth < 0) @@ -165,7 +189,68 @@ class AbortSignal extends EventTarget { timeOutSignals.delete(this); } } + + [kTransfer]() { + validateAbortSignal(this); + const aborted = this.aborted; + if (aborted) { + const reason = this.reason; + return { + data: { aborted, reason }, + deserializeInfo: 'internal/abort_controller:ClonedAbortSignal', + }; + } + + const { port1, port2 } = this[kCloneData]; + this[kCloneData] = port2; + + this.addEventListener('abort', () => { + port1.postMessage(this.reason); + port1.close(); + }, { once: true }); + + return { + data: { port: port2 }, + deserializeInfo: 'internal/abort_controller:ClonedAbortSignal', + }; + } + + [kTransferList]() { + if (!this.aborted) { + const { port1, port2 } = lazyMessageChannel(); + port1.unref(); + port2.unref(); + this[kCloneData] = { + port1, + port2, + }; + return [port2]; + } + return []; + } + + [kDeserialize]({ aborted, reason, port }) { + if (aborted) { + this[kAborted] = aborted; + this[kReason] = reason; + return; + } + + port.onmessage = ({ data }) => { + abortSignal(this, data); + port.close(); + port.onmessage = undefined; + }; + // The receiving port, by itself, should never keep the event loop open. + // The unref() has to be called *after* setting the onmessage handler. + port.unref(); + } +} + +function ClonedAbortSignal() { + return createAbortSignal(); } +ClonedAbortSignal.prototype[kDeserialize] = () => {}; ObjectDefineProperties(AbortSignal.prototype, { aborted: { enumerable: true } @@ -185,7 +270,7 @@ function createAbortSignal(aborted = false, reason = undefined) { ObjectSetPrototypeOf(signal, AbortSignal.prototype); signal[kAborted] = aborted; signal[kReason] = reason; - return signal; + return lazMakeTransferable(signal); } function abortSignal(signal, reason) { @@ -252,4 +337,5 @@ module.exports = { kAborted, AbortController, AbortSignal, + ClonedAbortSignal, }; diff --git a/test/parallel/test-abortsignal-cloneable.js b/test/parallel/test-abortsignal-cloneable.js new file mode 100644 index 00000000000000..a4349ba962ce9e --- /dev/null +++ b/test/parallel/test-abortsignal-cloneable.js @@ -0,0 +1,67 @@ +'use strict'; + +const common = require('../common'); +const { ok, strictEqual } = require('assert'); + +function deferred() { + let res; + const promise = new Promise((resolve) => res = resolve); + return { res, promise }; +} + +(async () => { + const ac = new AbortController(); + const mc = new MessageChannel(); + + const deferred1 = deferred(); + const deferred2 = deferred(); + const resolvers = [deferred1, deferred2]; + + mc.port1.onmessage = common.mustCall(({ data }) => { + data.addEventListener('abort', common.mustCall(() => { + strictEqual(data.reason, 'boom'); + resolvers.shift().res(); + })); + }, 2); + + mc.port2.postMessage(ac.signal, [ac.signal]); + + // Can be cloned/transferd multiple times and they all still work + mc.port2.postMessage(ac.signal, [ac.signal]); + + mc.port2.close(); + + // Although we're using transfer semantics, the local AbortSignal + // is still usable locally. + ac.signal.addEventListener('abort', common.mustCall(() => { + strictEqual(ac.signal.reason, 'boom'); + })); + + ac.abort('boom'); + + await Promise.all([ deferred1.promise, deferred2.promise ]); +})().then(common.mustCall()); + +{ + const signal = AbortSignal.abort('boom'); + ok(signal.aborted); + strictEqual(signal.reason, 'boom'); + const mc = new MessageChannel(); + mc.port1.onmessage = common.mustCall(({ data }) => { + ok(data instanceof AbortSignal); + ok(data.aborted); + strictEqual(data.reason, 'boom'); + mc.port1.close(); + }); + mc.port2.postMessage(signal, [signal]); +} + +{ + // The cloned AbortSignal does not keep the event loop open + // waiting for the abort to be triggered. + const ac = new AbortController(); + const mc = new MessageChannel(); + mc.port1.onmessage = common.mustCall(); + mc.port2.postMessage(ac.signal, [ac.signal]); + mc.port2.close(); +}