diff --git a/Emittery.d.ts b/Emittery.d.ts index e045850..6150545 100644 --- a/Emittery.d.ts +++ b/Emittery.d.ts @@ -24,6 +24,14 @@ declare class Emittery { */ once(eventName: string): Promise; + /** + * Get an asynchronous iterator which buffers data each time an event is + * emitted. + * + * Call `return()` on the iterator to remove the subscription. + */ + events(eventName: string): AsyncIterableIterator; + /** * Trigger an event asynchronously, optionally with some data. Listeners * are called in the order they were added, but execute concurrently. @@ -65,15 +73,24 @@ declare class Emittery { offAny(listener: (eventName: string, eventData?: any) => any): void; /** - * Clear all event listeners on the instance. + * Get an asynchronous iterator which buffers a tuple of an event name and + * data each time an event is emitted. + * + * Call `return()` on the iterator to remove the subscription. + */ + anyEvent(): AsyncIterableIterator<[string, any]>; + + /** + * Clear all iterators and event listeners on the instance. * - * If `eventName` is given, only the listeners for that event are cleared. + * If `eventName` is given, only the iterators and listeners for that event + * are cleared. */ clearListeners(eventName?: string): void; /** - * The number of listeners for the `eventName` or all events if not - * specified. + * The number of iterators and listeners for the `eventName` or all events + * if not specified. */ listenerCount(eventName?: string): number; } @@ -119,11 +136,16 @@ declare namespace Emittery { off(eventName: Name, listener?: (eventData: EventDataMap[Name]) => any): void; off(eventName: Name, listener?: () => any): void; + events(eventName: Name): AsyncIterableIterator; + events(eventName: Name): AsyncIterableIterator; + onAny(listener: (eventName: keyof EventDataMap | EmptyEvents, eventData?: EventDataMap[keyof EventDataMap]) => any): Emittery.UnsubscribeFn; offAny(listener?: (eventName: Name, eventData: EventDataMap[Name]) => any): void; offAny(listener?: (eventName: Name) => any): void; + anyEvent(): AsyncIterableIterator<[keyof EventDataMap | EmptyEvents, EventDataMap[keyof EventDataMap] | void]>; + emit(eventName: Name, eventData: EventDataMap[Name]): Promise; emit(eventName: Name): Promise; diff --git a/index.js b/index.js index 1f09455..867016d 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,8 @@ const anyMap = new WeakMap(); const eventsMap = new WeakMap(); +const producersMap = new WeakMap(); +const anyProducer = Symbol('anyProducer'); const resolvedPromise = Promise.resolve(); function assertEventName(eventName) { @@ -21,14 +23,88 @@ function getListeners(instance, eventName) { if (!events.has(eventName)) { events.set(eventName, new Set()); } - return events.get(eventName); } +function getEventProducers(instance, eventName) { + const key = typeof eventName === 'string' ? eventName : anyProducer; + const producers = producersMap.get(instance); + if (!producers.has(key)) { + producers.set(key, new Set()); + } + return producers.get(key); +} + +function iterator(instance, eventName) { + let finished = false; + let flush = () => {}; + let queue = []; + const producer = { + enqueue(item) { + queue.push(item); + flush(); + }, + finish() { + finished = true; + flush(); + } + }; + + getEventProducers(instance, eventName).add(producer); + return { + async next() { + if (!queue) { + return {done: true}; + } + + if (queue.length === 0) { + if (finished) { + queue = null; + return this.next(); + } + + await new Promise(resolve => { + flush = resolve; + }); + return this.next(); + } + + return {done: false, value: await queue.shift()}; + }, + async return(value) { + queue = null; + getEventProducers(instance, eventName).delete(producer); + flush(); + return arguments.length > 0 ? + {done: true, value: await value} : + {done: true}; + }, + [Symbol.asyncIterator]() { + return this; + } + }; +} + +function enqueueProducers(instance, eventName, eventData) { + const producers = producersMap.get(instance); + if (producers.has(eventName)) { + for (const producer of producers.get(eventName)) { + producer.enqueue(eventData); + } + } + if (producers.has(anyProducer)) { + const item = Promise.all([eventName, eventData]); + for (const producer of producers.get(anyProducer)) { + producer.enqueue(item); + } + } +} + class Emittery { constructor() { anyMap.set(this, new Set()); eventsMap.set(this, new Map()); + producersMap.set(this, new Map()); } on(eventName, listener) { @@ -54,9 +130,16 @@ class Emittery { }); } + events(eventName) { + assertEventName(eventName); + return iterator(this, eventName); + } + async emit(eventName, eventData) { assertEventName(eventName); + enqueueProducers(this, eventName, eventData); + const listeners = getListeners(this, eventName); const anyListeners = anyMap.get(this); const staticListeners = [...listeners]; @@ -80,6 +163,8 @@ class Emittery { async emitSerial(eventName, eventData) { assertEventName(eventName); + enqueueProducers(this, eventName, eventData); + const listeners = getListeners(this, eventName); const anyListeners = anyMap.get(this); const staticListeners = [...listeners]; @@ -112,20 +197,36 @@ class Emittery { anyMap.get(this).delete(listener); } + anyEvent() { + return iterator(this); + } + clearListeners(eventName) { if (typeof eventName === 'string') { getListeners(this, eventName).clear(); + const producers = getEventProducers(this, eventName); + for (const producer of producers) { + producer.finish(); + } + producers.clear(); } else { anyMap.get(this).clear(); for (const listeners of eventsMap.get(this).values()) { listeners.clear(); } + for (const producers of producersMap.get(this).values()) { + for (const producer of producers) { + producer.finish(); + } + producers.clear(); + } } } listenerCount(eventName) { if (typeof eventName === 'string') { - return anyMap.get(this).size + getListeners(this, eventName).size; + return anyMap.get(this).size + getListeners(this, eventName).size + + getEventProducers(this, eventName).size + getEventProducers(this).size; } if (typeof eventName !== 'undefined') { @@ -137,6 +238,9 @@ class Emittery { for (const value of eventsMap.get(this).values()) { count += value.size; } + for (const value of producersMap.get(this).values()) { + count += value.size; + } return count; } diff --git a/package.json b/package.json index 5acf233..d9aac58 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,8 @@ "ava": "*", "babel-cli": "^6.26.0", "babel-core": "^6.26.0", + "babel-eslint": "^8.1.2", + "babel-plugin-syntax-async-generators": "^6.13.0", "babel-plugin-transform-async-to-generator": "^6.24.1", "babel-plugin-transform-es2015-spread": "^6.22.0", "codecov": "^3.0.0", @@ -64,6 +66,15 @@ "typescript": "^2.6.2", "xo": "*" }, + "ava": { + "babel": { + "babelrc": false, + "presets": [ + "./test/_for-await.js", + "@ava/stage-4" + ] + } + }, "babel": { "plugins": [ "transform-async-to-generator", @@ -76,5 +87,8 @@ "lcov", "text" ] + }, + "xo": { + "parser": "babel-eslint" } } diff --git a/readme.md b/readme.md index b48ffc4..9d40987 100644 --- a/readme.md +++ b/readme.md @@ -70,6 +70,12 @@ emitter.once('🦄').then(data => { emitter.emit('🦄', '🌈'); ``` +#### events(eventName) + +Get an asynchronous iterator which buffers data each time an event is emitted. + +Call `return()` on the iterator to remove the subscription. + #### emit(eventName, [data]) Trigger an event asynchronously, optionally with some data. Listeners are called in the order they were added, but execute concurrently. @@ -94,15 +100,21 @@ Returns a method to unsubscribe. Remove an `onAny` subscription. +#### anyEvent() + +Get an asynchronous iterator which buffers a tuple of an event name and data each time an event is emitted. + +Call `return()` on the iterator to remove the subscription. + #### clearListeners() -Clear all event listeners on the instance. +Clear all iterators and event listeners on the instance. -If `eventName` is given, only the listeners for that event are cleared. +If `eventName` is given, only the iterators and listeners for that event are cleared. #### listenerCount([eventName]) -The number of listeners for the `eventName` or all events if not specified. +The number of iterators and listeners for the `eventName` or all events if not specified. ## TypeScript @@ -126,8 +138,9 @@ ee.emit('end'); // TS compilation error Listeners are not invoked for events emitted *before* the listener was added. Removing a listener will prevent that listener from being invoked, even if events are in the process of being (asynchronously!) emitted. This also applies to `.clearListeners()`, which removes all listeners. Listeners will be called in the order they were added. So-called *any* listeners are called *after* event-specific listeners. -Note that when using `.emitSerial()`, a slow listener will delay invocation of subsequent listeners. It's possible for newer events to overtake older ones. +Asynchronous iterators are fed events *before* listeners are invoked. They will not receive events emitted *before* the iterator was created. Iterators buffer events. Calling `.return()` on the iterator will clear the buffer. `.clearListeners()` causes iterators to complete after their event buffer is exhausted but does *not* clear the buffer. Iterators are fed events in the order they were created. So called *any* iterators are fed events *after* event-specific iterators. +Note that when using `.emitSerial()`, a slow listener will delay invocation of subsequent listeners. It's possible for newer events to overtake older ones. Slow listeners do impact asynchronous iterators. ## FAQ diff --git a/test/_for-await.js b/test/_for-await.js new file mode 100644 index 0000000..058d007 --- /dev/null +++ b/test/_for-await.js @@ -0,0 +1,12 @@ +'use strict'; +const vm = require('vm'); + +const syntax = require('babel-plugin-syntax-async-generators'); +const transform = require('babel-plugin-transform-async-to-generator'); + +try { + new vm.Script('async () => { for await (const _ of []) {} }'); // eslint-disable-line no-new + module.exports = {plugins: [syntax]}; +} catch (err) { + module.exports = {plugins: [syntax, transform]}; +} diff --git a/test/_run.js b/test/_run.js index 495812f..70e813a 100644 --- a/test/_run.js +++ b/test/_run.js @@ -1,6 +1,24 @@ import test from 'ava'; import delay from 'delay'; +// The babel-plugin-transform-async-generator-functions plugin assumes +// `Symbol.asyncIterator` exists, so stub it for iterator tests. +function stubAsyncIteratorSymbol(next) { + return async (...args) => { + if (!Symbol.asyncIterator) { + Symbol.asyncIterator = Symbol.for('Emittery.asyncIterator'); + } + + try { + return await next(...args); + } finally { + if (Symbol.asyncIterator === Symbol.for('Emittery.asyncIterator')) { + delete Symbol.asyncIterator; + } + } + }; +} + module.exports = Emittery => { test('on()', async t => { const emitter = new Emittery(); @@ -49,6 +67,51 @@ module.exports = Emittery => { t.deepEqual(calls, [1]); }); + test.serial('events()', stubAsyncIteratorSymbol(async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + + await emitter.emit('🦄', '🌈'); + setTimeout(() => { + emitter.emit('🦄', Promise.resolve('🌟')); + }, 10); + + t.plan(3); + const expected = ['🌈', '🌟']; + for await (const data of iterator) { + t.deepEqual(data, expected.shift()); + if (expected.length === 0) { + break; + } + } + + t.deepEqual(await iterator.next(), {done: true}); + })); + + test('events() - return() called during emit', async t => { + const emitter = new Emittery(); + let iterator = null; + emitter.on('🦄', () => { + iterator.return(); + }); + iterator = emitter.events('🦄'); + emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: false, value: '🌈'}); + t.deepEqual(await iterator.next(), {done: true}); + }); + + test('events() - return() awaits its argument', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + t.deepEqual(await iterator.return(Promise.resolve(1)), {done: true, value: 1}); + }); + + test('events() - return() without argument', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + t.deepEqual(await iterator.return(), {done: true}); + }); + test('off()', async t => { const emitter = new Emittery(); const calls = []; @@ -334,6 +397,39 @@ module.exports = Emittery => { t.throws(() => emitter.offAny(), TypeError); }); + test.serial('anyEvent()', stubAsyncIteratorSymbol(async t => { + const emitter = new Emittery(); + const iterator = emitter.anyEvent(); + + await emitter.emit('🦄', '🌈'); + setTimeout(() => { + emitter.emit('🦄', Promise.resolve('🌟')); + }, 10); + + t.plan(3); + const expected = [['🦄', '🌈'], ['🦄', '🌟']]; + for await (const data of iterator) { + t.deepEqual(data, expected.shift()); + if (expected.length === 0) { + break; + } + } + + t.deepEqual(await iterator.next(), {done: true}); + })); + + test('anyEvent() - return() called during emit', async t => { + const emitter = new Emittery(); + let iterator = null; + emitter.onAny(() => { + iterator.return(); + }); + iterator = emitter.anyEvent(); + emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: false, value: ['🦄', '🌈']}); + t.deepEqual(await iterator.next(), {done: true}); + }); + test('clearListeners()', async t => { const emitter = new Emittery(); const calls = []; @@ -351,6 +447,24 @@ module.exports = Emittery => { t.deepEqual(calls, ['🦄1', '🦄2', 'any1', 'any2', '🌈', 'any1', 'any2']); }); + test('clearListeners() - also clears iterators', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + const anyIterator = emitter.anyEvent(); + await emitter.emit('🦄', '🌟'); + await emitter.emit('🌈', '🌟'); + t.deepEqual(await iterator.next(), {done: false, value: '🌟'}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '🌟']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '🌟']}); + await emitter.emit('🦄', '💫'); + emitter.clearListeners(); + await emitter.emit('🌈', '💫'); + t.deepEqual(await iterator.next(), {done: false, value: '💫'}); + t.deepEqual(await iterator.next(), {done: true}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '💫']}); + t.deepEqual(await anyIterator.next(), {done: true}); + }); + test('clearListeners() - with event name', async t => { const emitter = new Emittery(); const calls = []; @@ -368,16 +482,36 @@ module.exports = Emittery => { t.deepEqual(calls, ['🦄1', '🦄2', 'any1', 'any2', '🌈', 'any1', 'any2', 'any1', 'any2', '🌈', 'any1', 'any2']); }); + test('clearListeners() - with event name - clears iterators for that event', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + const anyIterator = emitter.anyEvent(); + await emitter.emit('🦄', '🌟'); + await emitter.emit('🌈', '🌟'); + t.deepEqual(await iterator.next(), {done: false, value: '🌟'}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '🌟']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '🌟']}); + await emitter.emit('🦄', '💫'); + emitter.clearListeners('🦄'); + await emitter.emit('🌈', '💫'); + t.deepEqual(await iterator.next(), {done: false, value: '💫'}); + t.deepEqual(await iterator.next(), {done: true}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '💫']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '💫']}); + }); + test('listenerCount()', t => { const emitter = new Emittery(); emitter.on('🦄', () => {}); emitter.on('🌈', () => {}); emitter.on('🦄', () => {}); + emitter.events('🌈'); emitter.onAny(() => {}); emitter.onAny(() => {}); - t.is(emitter.listenerCount('🦄'), 4); - t.is(emitter.listenerCount('🌈'), 3); - t.is(emitter.listenerCount(), 5); + emitter.anyEvent(); + t.is(emitter.listenerCount('🦄'), 5); + t.is(emitter.listenerCount('🌈'), 5); + t.is(emitter.listenerCount(), 7); }); test('listenerCount() - works with empty eventName strings', t => { diff --git a/test/fixtures/compiles/tsconfig.json b/test/fixtures/compiles/tsconfig.json deleted file mode 100644 index 27a6731..0000000 --- a/test/fixtures/compiles/tsconfig.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "compilerOptions": { - "target": "es2017", - "lib": ["es2017"], - "module": "commonjs", - "strict": true, - "moduleResolution": "node", - "allowSyntheticDefaultImports": true - }, - "include": ["*.ts"] -} diff --git a/test/fixtures/tsconfig.json b/test/fixtures/tsconfig.json deleted file mode 100644 index 27a6731..0000000 --- a/test/fixtures/tsconfig.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "compilerOptions": { - "target": "es2017", - "lib": ["es2017"], - "module": "commonjs", - "strict": true, - "moduleResolution": "node", - "allowSyntheticDefaultImports": true - }, - "include": ["*.ts"] -} diff --git a/test/types.js b/test/types.js index ff27ab4..d834ff7 100644 --- a/test/types.js +++ b/test/types.js @@ -2,10 +2,12 @@ import path from 'path'; import test from 'ava'; import glob from 'glob'; -import * as ts from 'typescript'; + +// Import syntax trips up Atom with ide-typescript loaded. +const ts = require('typescript'); const compilerOptions = { - target: ts.ScriptTarget.ES2017, + target: ts.ScriptTarget.ESNext, module: ts.ModuleKind.CommonJS, strict: true, noEmit: true diff --git a/tsconfig.json b/tsconfig.json index 21b3bd7..e8ebc14 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,7 +2,10 @@ "compilerOptions": { "allowSyntheticDefaultImports": true, "alwaysStrict": true, - "lib": ["es2017"], + "lib": [ + "es2017", + "esnext.asynciterable" + ], "module": "commonjs", "moduleResolution": "node", "noImplicitAny": true,