Skip to content

Commit

Permalink
lib: make AbortSignal cloneable/transferable
Browse files Browse the repository at this point in the history
Allows for using `AbortSignal` across worker threads and contexts.

```js
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = ({ data }) => {
  data.addEventListener('abort', () => {
    console.log('aborted!');
  });
};
mc.port2.postMessage(ac.signal, [ac.signal]);
```

Signed-off-by: James M Snell <jasnell@gmail.com>
  • Loading branch information
jasnell committed Dec 1, 2021
1 parent 94de738 commit ddd42c6
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 1 deletion.
76 changes: 75 additions & 1 deletion lib/internal/abort_controller.js
Expand Up @@ -29,8 +29,20 @@ const {
}
} = require('internal/errors');

const {
makeTransferable,
kTransfer,
kTransferList,
kDeserialize,
} = require('internal/worker/js_transferable');

const {
MessageChannel,
} = require('internal/worker/io');

const kAborted = Symbol('kAborted');
const kReason = Symbol('kReason');
const kCloneData = Symbol('kCloneData');

function customInspect(self, obj, depth, options) {
if (depth < 0)
Expand Down Expand Up @@ -82,7 +94,68 @@ class AbortSignal extends EventTarget {
static abort(reason) {
return createAbortSignal(true, reason);
}

[kTransfer]() {
validateAbortSignal(this);
const aborted = this.aborted;
if (aborted) {
const reason = this.reason;
return {
data: { aborted, reason },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

const { port1, port2 } = this[kCloneData];
this[kCloneData] = port2;

this.addEventListener('abort', () => {
port1.postMessage(this.reason);
port1.close();
}, { once: true });

return {
data: { port: port2 },
deserializeInfo: 'internal/abort_controller:ClonedAbortSignal',
};
}

[kTransferList]() {
if (!this.aborted) {
const { port1, port2 } = new MessageChannel();
port1.unref();
port2.unref();
this[kCloneData] = {
port1,
port2,
};
return [port2];
}
return [];
}

[kDeserialize]({ aborted, reason, port }) {
if (aborted) {
this[kAborted] = aborted;
this[kReason] = reason;
return;
}

port.onmessage = ({ data }) => {
abortSignal(this, data);
port.close();
port.onmessage = undefined;
};
// The receiving port, by itself, should never keep the event loop open.
// The unref() has to be called *after* setting the onmessage handler.
port.unref();
}
}

function ClonedAbortSignal() {
return createAbortSignal();
}
ClonedAbortSignal.prototype[kDeserialize] = () => {};

ObjectDefineProperties(AbortSignal.prototype, {
aborted: { enumerable: true }
Expand All @@ -102,7 +175,7 @@ function createAbortSignal(aborted = false, reason = undefined) {
ObjectSetPrototypeOf(signal, AbortSignal.prototype);
signal[kAborted] = aborted;
signal[kReason] = reason;
return signal;
return makeTransferable(signal);
}

function abortSignal(signal, reason) {
Expand Down Expand Up @@ -169,4 +242,5 @@ module.exports = {
kAborted,
AbortController,
AbortSignal,
ClonedAbortSignal,
};
52 changes: 52 additions & 0 deletions test/parallel/test-abortsignal-cloneable.js
@@ -0,0 +1,52 @@
'use strict';

const common = require('../common');
const { ok, strictEqual } = require('assert');

{
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall(({ data }) => {
data.addEventListener('abort', common.mustCall(() => {
strictEqual(data.reason, 'boom');
}));
}, 2);
mc.port2.postMessage(ac.signal, [ac.signal]);

// Can be cloned/transferd multiple times and they all still work
mc.port2.postMessage(ac.signal, [ac.signal]);

mc.port2.close();

// Although we're using transfer semantics, the local AbortSignal
// is still usable locally.
ac.signal.addEventListener('abort', common.mustCall(() => {
strictEqual(ac.signal.reason, 'boom');
}));

ac.abort('boom');
}

{
const signal = AbortSignal.abort('boom');
ok(signal.aborted);
strictEqual(signal.reason, 'boom');
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall(({ data }) => {
ok(data instanceof AbortSignal);
ok(data.aborted);
strictEqual(data.reason, 'boom');
mc.port1.close();
});
mc.port2.postMessage(signal, [signal]);
}

{
// The cloned AbortSignal does not keep the event loop open
// waiting for the abort to be triggered.
const ac = new AbortController();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustCall();
mc.port2.postMessage(ac.signal, [ac.signal]);
mc.port2.close();
}

0 comments on commit ddd42c6

Please sign in to comment.