Skip to content

Commit

Permalink
lib: make AbortSignal cloneable/transferable
Browse files Browse the repository at this point in the history
Allows for using `AbortSignal` across worker threads and contexts.

```js
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = ({ data }) => {
  data.addEventListener('abort', () => {
    console.log('aborted!');
  });
};
mc.port2.postMessage(ac.signal, [ac.signal]);
```

Signed-off-by: James M Snell <jasnell@gmail.com>

PR-URL: #41050
Refs: whatwg/dom#948
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
jasnell authored and danielleadams committed Dec 13, 2021
1 parent 28761de commit 348707f
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 5 deletions.
96 changes: 91 additions & 5 deletions lib/internal/abort_controller.js
Expand Up @@ -47,13 +47,37 @@ const {
setTimeout,
} = require('timers');

const kAborted = Symbol('kAborted');
const kReason = Symbol('kReason');
const kTimeout = Symbol('kTimeout');
const {
messaging_deserialize_symbol: kDeserialize,
messaging_transfer_symbol: kTransfer,
messaging_transfer_list_symbol: kTransferList
} = internalBinding('symbols');

const timeOutSignals = new SafeSet();
let _MessageChannel;
let makeTransferable;

// Loading the MessageChannel and makeTransferable have to be done lazily
// because otherwise we'll end up with a require cycle that ends up with
// an incomplete initialization of abort_controller.

function lazyMessageChannel() {
_MessageChannel ??= require('internal/worker/io').MessageChannel;
return new _MessageChannel();
}

function lazyMakeTransferable(obj) {
makeTransferable ??=
require('internal/worker/js_transferable').makeTransferable;
return makeTransferable(obj);
}

const clearTimeoutRegistry = new SafeFinalizationRegistry(clearTimeout);
const timeOutSignals = new SafeSet();

const kAborted = Symbol('kAborted');
const kReason = Symbol('kReason');
const kCloneData = Symbol('kCloneData');
const kTimeout = Symbol('kTimeout');

function customInspect(self, obj, depth, options) {
if (depth < 0)
Expand Down Expand Up @@ -172,7 +196,68 @@ class AbortSignal extends EventTarget {
timeOutSignals.delete(this);
}
}

[kTransfer]() {
validateAbortSignal(this);
const aborted = this.aborted;
if (aborted) {
const reason = this.reason;
return {
data: { aborted, reason },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

const { port1, port2 } = this[kCloneData];
this[kCloneData] = undefined;

this.addEventListener('abort', () => {
port1.postMessage(this.reason);
port1.close();
}, { once: true });

return {
data: { port: port2 },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

[kTransferList]() {
if (!this.aborted) {
const { port1, port2 } = lazyMessageChannel();
port1.unref();
port2.unref();
this[kCloneData] = {
port1,
port2,
};
return [port2];
}
return [];
}

[kDeserialize]({ aborted, reason, port }) {
if (aborted) {
this[kAborted] = aborted;
this[kReason] = reason;
return;
}

port.onmessage = ({ data }) => {
abortSignal(this, data);
port.close();
port.onmessage = undefined;
};
// The receiving port, by itself, should never keep the event loop open.
// The unref() has to be called *after* setting the onmessage handler.
port.unref();
}
}

function ClonedAbortSignal() {
return createAbortSignal();
}
ClonedAbortSignal.prototype[kDeserialize] = () => {};

ObjectDefineProperties(AbortSignal.prototype, {
aborted: { enumerable: true }
Expand All @@ -192,7 +277,7 @@ function createAbortSignal(aborted = false, reason = undefined) {
ObjectSetPrototypeOf(signal, AbortSignal.prototype);
signal[kAborted] = aborted;
signal[kReason] = reason;
return signal;
return lazyMakeTransferable(signal);
}

function abortSignal(signal, reason) {
Expand Down Expand Up @@ -259,4 +344,5 @@ module.exports = {
kAborted,
AbortController,
AbortSignal,
ClonedAbortSignal,
};
78 changes: 78 additions & 0 deletions test/parallel/test-abortsignal-cloneable.js
@@ -0,0 +1,78 @@
'use strict';

const common = require('../common');
const { ok, strictEqual } = require('assert');
const { setImmediate: pause } = require('timers/promises');

function deferred() {
let res;
const promise = new Promise((resolve) => res = resolve);
return { res, promise };
}

(async () => {
const ac = new AbortController();
const mc = new MessageChannel();

const deferred1 = deferred();
const deferred2 = deferred();
const resolvers = [deferred1, deferred2];

mc.port1.onmessage = common.mustCall(({ data }) => {
data.addEventListener('abort', common.mustCall(() => {
strictEqual(data.reason, 'boom');
}));
resolvers.shift().res();
}, 2);

mc.port2.postMessage(ac.signal, [ac.signal]);

// Can be cloned/transferd multiple times and they all still work
mc.port2.postMessage(ac.signal, [ac.signal]);

mc.port2.close();

// Although we're using transfer semantics, the local AbortSignal
// is still usable locally.
ac.signal.addEventListener('abort', common.mustCall(() => {
strictEqual(ac.signal.reason, 'boom');
}));

await Promise.all([ deferred1.promise, deferred2.promise ]);

ac.abort('boom');

// Because the postMessage used by the underlying AbortSignal
// takes at least one turn of the event loop to be processed,
// and because it is unref'd, it won't, by itself, keep the
// event loop open long enough for the test to complete, so
// we schedule two back to back turns of the event to ensure
// the loop runs long enough for the test to complete.
await pause();
await pause();

})().then(common.mustCall());

{
const signal = AbortSignal.abort('boom');
ok(signal.aborted);
strictEqual(signal.reason, 'boom');
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall(({ data }) => {
ok(data instanceof AbortSignal);
ok(data.aborted);
strictEqual(data.reason, 'boom');
mc.port1.close();
});
mc.port2.postMessage(signal, [signal]);
}

{
// The cloned AbortSignal does not keep the event loop open
// waiting for the abort to be triggered.
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall();
mc.port2.postMessage(ac.signal, [ac.signal]);
mc.port2.close();
}

0 comments on commit 348707f

Please sign in to comment.