Skip to content

Commit

Permalink
add EventEmitter.on to async iterate over events
Browse files Browse the repository at this point in the history
  • Loading branch information
goto-bus-stop committed Feb 28, 2021
1 parent 48e3d18 commit a205441
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 0 deletions.
104 changes: 104 additions & 0 deletions events.js
Expand Up @@ -55,6 +55,7 @@ function EventEmitter() {
}
module.exports = EventEmitter;
module.exports.once = once;
module.exports.on = on;

// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
Expand Down Expand Up @@ -467,6 +468,109 @@ function once(emitter, name) {
});
}

function createIterResult(value, done) {
return { value: value, done: done };
}

var AsyncIteratorPrototype = undefined;

function on(emitter, event) {
// Initialize it on first run
if (AsyncIteratorPrototype === undefined) {
var asyncGenerator;
try {
asyncGenerator = Function('return async function*() {};')();
} catch (err) {}
if (asyncGenerator) {
AsyncIteratorPrototype = Object.getPrototypeOf(
Object.getPrototypeOf(asyncGenerator).prototype);
} else {
AsyncIteratorPrototype = null;
}
}

var unconsumedEvents = [];
var unconsumedPromises = [];
var error = null;
var finished = false;
var iterator = {
next: function next() {
// First, we consume all unread events
var value = unconsumedEvents.shift();
if (value) {
return Promise.resolve(createIterResult(value, false));
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
var p = Promise.reject(error);
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) {
return Promise.resolve(createIterResult(undefined, true));
}
return new Promise(function (resolve, reject) {
unconsumedPromises.push({ resolve: resolve, reject: reject });
});
},
'return': function _return() {
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
finished = true;

for (var i = 0, l = unconsumedPromises.length; i < l; i++) {
unconsumedPromises[i].resolve(createIterResult(undefined, true));
}
return Promise.resolve(createIterResult(undefined, true));
},
'throw': function _throw(err) {
if (!err || !(err instanceof Error)) {
throw new TypeError('The "EventEmitter.AsyncIterator" property must be an instance of Error. Received ' + typeof err);
}
error = err;
emitter.removeListener(event, eventHandler);
emitter.removeListener('error', errorHandler);
}
};

iterator[Symbol.asyncIterator] = function () { return this; };

Object.setPrototypeOf(iterator, AsyncIteratorPrototype);

emitter.on(event, eventHandler);
emitter.on('error', errorHandler);

return iterator;

function eventHandler() {
var args = [].slice.call(arguments);
var promise = unconsumedPromises.shift();
if (promise) {
promise.resolve(createIterResult(args, false));
} else {
unconsumedEvents.push(args);
}
}

function errorHandler(err) {
finished = true;

var toError = unconsumedPromises.shift();
if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
}
iterator.return();
}
}

function addErrorHandlerIfEventEmitter(emitter, handler, flags) {
if (typeof emitter.on === 'function') {
eventTargetAgnosticAddListener(emitter, 'error', handler, flags);
Expand Down
6 changes: 6 additions & 0 deletions tests/index.js
Expand Up @@ -50,6 +50,12 @@ if (functionsHaveNames()) {
require('./modify-in-emit.js');
require('./num-args.js');
require('./once.js');
if (typeof Promise === 'function' && hasSymbols() && Symbol.asyncIterator) {
require('./on-async-iterator.js');
} else {
// Async iterator support is not available.
test('./on-async-iterator.js', { skip: true }, function () {});
}
require('./prepend.js');
require('./set-max-listeners-side-effects.js');
require('./special-event-names.js');
Expand Down
224 changes: 224 additions & 0 deletions tests/on-async-iterator.js
@@ -0,0 +1,224 @@
'use strict';

var common = require('./common');
var assert = require('assert');
var EventEmitter = require('../').EventEmitter;
var on = require('../').on;

async function basic() {
var ee = new EventEmitter();
process.nextTick(function () {
ee.emit('foo', 'bar');
// 'bar' is a spurious event, we are testing
// that it does not show up in the iterable
ee.emit('bar', 24);
ee.emit('foo', 42);
});

var iterable = on(ee, 'foo');

var expected = [['bar'], [42]];

for await (var event of iterable) {
var current = expected.shift();

assert.deepStrictEqual(current, event);

if (expected.length === 0) {
break;
}
}
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function error() {
var ee = new EventEmitter();
var _err = new Error('kaboom');
process.nextTick(function () {
ee.emit('error', _err);
});

var iterable = on(ee, 'foo');
let looped = false;
let thrown = false;

try {
// eslint-disable-next-line no-unused-vars
for await (var event of iterable) {
looped = true;
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(looped, false);
}

async function errorDelayed() {
var ee = new EventEmitter();
var _err = new Error('kaboom');
process.nextTick(function () {
ee.emit('foo', 42);
ee.emit('error', _err);
});

var iterable = on(ee, 'foo');
var expected = [[42]];
let thrown = false;

try {
for await (var event of iterable) {
var current = expected.shift();
assert.deepStrictEqual(current, event);
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function throwInLoop() {
var ee = new EventEmitter();
var _err = new Error('kaboom');

process.nextTick(function () {
ee.emit('foo', 42);
});

try {
for await (var event of on(ee, 'foo')) {
assert.deepStrictEqual(event, [42]);
throw _err;
}
} catch (err) {
assert.strictEqual(err, _err);
}

assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function next() {
var ee = new EventEmitter();
var iterable = on(ee, 'foo');

process.nextTick(function() {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
iterable.return();
});

var results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next()
]);

assert.deepStrictEqual(results, [{
value: ['bar'],
done: false
}, {
value: [42],
done: false
}, {
value: undefined,
done: true
}]);

assert.deepStrictEqual(await iterable.next(), {
value: undefined,
done: true
});
}

async function nextError() {
var ee = new EventEmitter();
var iterable = on(ee, 'foo');
var _err = new Error('kaboom');
process.nextTick(function() {
ee.emit('error', _err);
});
var results = await Promise.allSettled([
iterable.next(),
iterable.next(),
iterable.next()
]);
assert.deepStrictEqual(results, [{
status: 'rejected',
reason: _err
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}, {
status: 'fulfilled',
value: {
value: undefined,
done: true
}
}]);
assert.strictEqual(ee.listeners('error').length, 0);
}

async function iterableThrow() {
var ee = new EventEmitter();
var iterable = on(ee, 'foo');

process.nextTick(function () {
ee.emit('foo', 'bar');
ee.emit('foo', 42); // lost in the queue
iterable.throw(_err);
});

var _err = new Error('kaboom');
let thrown = false;

assert.throws(function () {
// No argument
iterable.throw();
}, {
message: 'The "EventEmitter.AsyncIterator" property must be' +
' an instance of Error. Received undefined',
name: 'TypeError'
});

var expected = [['bar'], [42]];

try {
for await (var event of iterable) {
assert.deepStrictEqual(event, expected.shift());
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(expected.length, 0);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function run() {
var funcs = [
basic,
error,
errorDelayed,
throwInLoop,
next,
nextError,
iterableThrow,
];

for (var fn of funcs) {
await fn();
}
}

module.exports = run();

0 comments on commit a205441

Please sign in to comment.