Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: allow use of AbortController with on #34912

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 31 additions & 1 deletion doc/api/events.md
Expand Up @@ -1000,7 +1000,7 @@ Value: `Symbol.for('nodejs.rejection')`

See how to write a custom [rejection handler][rejection].

## `events.on(emitter, eventName)`
## `events.on(emitter, eventName[, options])`
<!-- YAML
added:
- v13.6.0
Expand All @@ -1009,6 +1009,9 @@ added:

* `emitter` {EventEmitter}
* `eventName` {string|symbol} The name of the event being listened for
* `options` {Object}
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
events.
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`

```js
Expand Down Expand Up @@ -1038,6 +1041,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
exiting the loop. The `value` returned by each iteration is an array
composed of the emitted event arguments.

An {AbortSignal} may be used to cancel waiting on events:

```js
const { on, EventEmitter } = require('events');
const ac = new AbortController();

(async () => {
const ee = new EventEmitter();

// Emit later on
process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
});

for await (const event of on(ee, 'foo', { signal: ac.signal })) {
// The execution of this inner block is synchronous and it
// processes one event at a time (even with await). Do not use
// if concurrent execution is required.
console.log(event); // prints ['bar'] [42]
}
// Unreachable here
})();

process.nextTick(() => ac.abort());
```

## `EventTarget` and `Event` API
<!-- YAML
added: v14.5.0
Expand Down
28 changes: 27 additions & 1 deletion lib/events.js
Expand Up @@ -730,7 +730,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
}
}

function on(emitter, event) {
function on(emitter, event, options) {
const { signal } = { ...options };
validateAbortSignal(signal, 'options.signal');
if (signal && signal.aborted) {
throw lazyDOMException('The operation was aborted', 'AbortError');
}

const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
Expand Down Expand Up @@ -768,6 +774,15 @@ function on(emitter, event) {
return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
Expand Down Expand Up @@ -797,9 +812,20 @@ function on(emitter, event) {
addErrorHandlerIfEventEmitter(emitter, errorHandler);
}

if (signal) {
eventTargetAgnosticAddListener(
lundibundi marked this conversation as resolved.
Show resolved Hide resolved
signal,
'abort',
abortListener,
{ once: true });
}

return iterator;

function abortListener() {
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
}

function eventHandler(...args) {
const promise = unconsumedPromises.shift();
if (promise) {
Expand Down
121 changes: 119 additions & 2 deletions test/parallel/test-event-on-async-iterator.js
@@ -1,4 +1,4 @@
// Flags: --expose-internals
// Flags: --expose-internals --no-warnings
'use strict';

const common = require('../common');
Expand Down Expand Up @@ -248,6 +248,117 @@ async function nodeEventTarget() {
clearInterval(interval);
}

async function abortableOnBefore() {
const ee = new EventEmitter();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(ee, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function eventTargetAbortableOnBefore() {
const et = new EventTarget();
const ac = new AbortController();
ac.abort();
[1, {}, null, false, 'hi'].forEach((signal) => {
assert.throws(() => on(et, 'foo', { signal }), {
code: 'ERR_INVALID_ARG_TYPE'
});
});
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
name: 'AbortError'
});
}

async function abortableOnAfter() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 10);

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f, 'foo');
jasnell marked this conversation as resolved.
Show resolved Hide resolved
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});

process.nextTick(() => ac.abort());
}

async function eventTargetAbortableOnAfter2() {
const et = new EventTarget();
const ac = new AbortController();

const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);

async function foo() {
for await (const f of on(et, 'foo', { signal: ac.signal })) {
assert(f);
// Cancel after a single event has been triggered.
ac.abort();
}
}

foo().catch(common.mustCall((error) => {
assert.strictEqual(error.name, 'AbortError');
})).finally(() => {
clearInterval(i);
});
}

async function abortableOnAfterDone() {
const ee = new EventEmitter();
const ac = new AbortController();

const i = setInterval(() => ee.emit('foo', 'foo'), 1);
let count = 0;

async function foo() {
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
assert.strictEqual(f[0], 'foo');
if (++count === 5)
break;
}
ac.abort(); // No error will occur
}

foo().finally(() => {
clearInterval(i);
});
}

async function run() {
const funcs = [
Expand All @@ -260,7 +371,13 @@ async function run() {
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget
nodeEventTarget,
abortableOnBefore,
abortableOnAfter,
eventTargetAbortableOnBefore,
eventTargetAbortableOnAfter,
eventTargetAbortableOnAfter2,
abortableOnAfterDone
];

for (const fn of funcs) {
Expand Down