diff --git a/index.d.ts b/index.d.ts index 95eaee2..473a918 100644 --- a/index.d.ts +++ b/index.d.ts @@ -26,6 +26,55 @@ declare class Emittery { */ on(eventName: string, listener: (eventData?: any) => any): Emittery.UnsubscribeFn; + /** + * Get an async iterator which buffers data each time an event is emitted. + * + * Call `return()` on the iterator to remove the subscription. + * + * @example + * ``` + * const iterator = emitter.events('🦄'); + * + * emitter.emit('🦄', '🌈1'); // buffered + * emitter.emit('🦄', '🌈2'); // buffered + * + * iterator + * .next() + * .then( ({value, done}) => { + * // done is false + * // value === '🌈1' + * return iterator.next(); + * }) + * .then( ({value, done}) => { + * // done is false + * // value === '🌈2' + * // revoke subscription + * return iterator.return(); + * }) + * .then(({done}) => { + * // done is true + * }); + * ``` + * + * In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement. + * In that case, to revoke the subscription simply break the loop + * + * @example + * ``` + * // in an async context + * const iterator = emitter.events('🦄'); + * + * emitter.emit('🦄', '🌈1'); // buffered + * emitter.emit('🦄', '🌈2'); // buffered + * + * for await (const data of iterator){ + * if(data === '🌈2') + * break; // revoke the subscription when we see the value '🌈2' + * } + * ``` + */ + events(eventName:string): AsyncIterableIterator + /** * Remove an event subscription. */ @@ -74,6 +123,39 @@ declare class Emittery { */ onAny(listener: (eventName: string, eventData?: any) => any): Emittery.UnsubscribeFn; + /* + * Get an async iterator which buffers a tuple of an event name and data each time an event is emitted. + * + * Call `return()` on the iterator to remove the subscription. + * + * @example + * ``` + * const iterator = emitter.anyEvent(); + * + * emitter.emit('🦄', '🌈1'); // buffered + * emitter.emit('🌟', '🌈2'); // buffered + * + * iterator.next() + * .then( ({value, done}) => { + * // done is false + * // value is ['🦄', '🌈1'] + * return iterator.next(); + * }) + * .then( ({value, done}) => { + * // done is false + * // value is ['🌟', '🌈2'] + * // revoke subscription + * return iterator.return(); + * }) + * .then(({done}) => { + * // done is true + * }); + * ``` + * + * In the same way as for ``events`` you can subscribe by using the ``for await`` statement + */ + anyEvent(): AsyncIterableIterator + /** * Remove an `onAny` subscription. */ @@ -144,6 +226,8 @@ declare namespace Emittery { on>(eventName: Name, listener: (eventData: EventDataMap[Name]) => any): Emittery.UnsubscribeFn; on(eventName: Name, listener: () => any): Emittery.UnsubscribeFn; + events>(eventName: Name): AsyncIterableIterator; + once>(eventName: Name): Promise; once(eventName: Name): Promise; @@ -151,6 +235,8 @@ declare namespace Emittery { off(eventName: Name, listener: () => any): void; onAny(listener: (eventName: Extract | EmptyEvents, eventData?: EventDataMap[Extract]) => any): Emittery.UnsubscribeFn; + anyEvent(): AsyncIterableIterator<[Extract, EventDataMap[Extract]]>; + offAny(listener: (eventName: Extract | EmptyEvents, eventData?: EventDataMap[Extract]) => any): void; emit>(eventName: Name, eventData: EventDataMap[Name]): Promise; diff --git a/index.js b/index.js index e666671..5de838c 100644 --- a/index.js +++ b/index.js @@ -2,6 +2,8 @@ const anyMap = new WeakMap(); const eventsMap = new WeakMap(); +const producersMap = new WeakMap(); +const anyProducer = Symbol('anyProducer'); const resolvedPromise = Promise.resolve(); function assertEventName(eventName) { @@ -25,6 +27,82 @@ function getListeners(instance, eventName) { return events.get(eventName); } +function getEventProducers(instance, eventName) { + const key = typeof eventName === 'string' ? eventName : anyProducer; + const producers = producersMap.get(instance); + if (!producers.has(key)) { + producers.set(key, new Set()); + } + + return producers.get(key); +} + +function enqueueProducers(instance, eventName, eventData) { + const producers = producersMap.get(instance); + if (producers.has(eventName)) { + for (const producer of producers.get(eventName)) { + producer.enqueue(eventData); + } + } + + if (producers.has(anyProducer)) { + const item = Promise.all([eventName, eventData]); + for (const producer of producers.get(anyProducer)) { + producer.enqueue(item); + } + } +} + +function iterator(instance, eventName) { + let finished = false; + let flush = () => {}; + let queue = []; + const producer = { + enqueue(item) { + queue.push(item); + flush(); + }, + finish() { + finished = true; + flush(); + } + }; + + getEventProducers(instance, eventName).add(producer); + return { + async next() { + if (!queue) { + return {done: true}; + } + + if (queue.length === 0) { + if (finished) { + queue = null; + return this.next(); + } + + await new Promise(resolve => { + flush = resolve; + }); + return this.next(); + } + + return {done: false, value: await queue.shift()}; + }, + async return(value) { + queue = null; + getEventProducers(instance, eventName).delete(producer); + flush(); + return arguments.length > 0 ? + {done: true, value: await value} : + {done: true}; + }, + [Symbol.asyncIterator]() { + return this; + } + }; +} + function defaultMethodNamesOrAssert(methodNames) { if (methodNames === undefined) { return allEmitteryMethods; @@ -92,6 +170,7 @@ class Emittery { constructor() { anyMap.set(this, new Set()); eventsMap.set(this, new Map()); + producersMap.set(this, new Map()); } on(eventName, listener) { @@ -117,9 +196,16 @@ class Emittery { }); } + events(eventName) { + assertEventName(eventName); + return iterator(this, eventName); + } + async emit(eventName, eventData) { assertEventName(eventName); + enqueueProducers(this, eventName, eventData); + const listeners = getListeners(this, eventName); const anyListeners = anyMap.get(this); const staticListeners = [...listeners]; @@ -170,6 +256,10 @@ class Emittery { return this.offAny.bind(this, listener); } + anyEvent() { + return iterator(this); + } + offAny(listener) { assertListener(listener); anyMap.get(this).delete(listener); @@ -178,17 +268,34 @@ class Emittery { clearListeners(eventName) { if (typeof eventName === 'string') { getListeners(this, eventName).clear(); + const producers = getEventProducers(this, eventName); + + for (const producer of producers) { + producer.finish(); + } + + producers.clear(); } else { anyMap.get(this).clear(); + for (const listeners of eventsMap.get(this).values()) { listeners.clear(); } + + for (const producers of producersMap.get(this).values()) { + for (const producer of producers) { + producer.finish(); + } + + producers.clear(); + } } } listenerCount(eventName) { if (typeof eventName === 'string') { - return anyMap.get(this).size + getListeners(this, eventName).size; + return anyMap.get(this).size + getListeners(this, eventName).size + + getEventProducers(this, eventName).size + getEventProducers(this).size; } if (typeof eventName !== 'undefined') { @@ -201,6 +308,10 @@ class Emittery { count += value.size; } + for (const value of producersMap.get(this).values()) { + count += value.size; + } + return count; } diff --git a/readme.md b/readme.md index 88a3792..b646425 100644 --- a/readme.md +++ b/readme.md @@ -69,7 +69,54 @@ emitter.once('🦄').then(data => { emitter.emit('🦄', '🌈'); ``` -#### emit(eventName, data?) +#### events(eventName) + +Get an async iterator which buffers data each time an event is emitted. + +Call `return()` on the iterator to remove the subscription. + +```js +const iterator = emitter.events('🦄'); + +emitter.emit('🦄', '🌈1'); // buffered +emitter.emit('🦄', '🌈2'); // buffered + +iterator + .next() + .then( ({value, done}) => { + // done is false + // value === '🌈1' + return iterator.next(); + }) + .then( ({value, done}) => { + // done is false + // value === '🌈2' + // revoke subscription + return iterator.return(); + }) + .then(({done}) => { + // done is true + }); +``` + +In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement. +In that case, to revoke the subscription simply break the loop + +```js +// in an async context +const iterator = emitter.events('🦄'); + +emitter.emit('🦄', '🌈1'); // buffered +emitter.emit('🦄', '🌈2'); // buffered + +for await (const data of iterator){ + if(data === '🌈2') + break; // revoke the subscription when we see the value '🌈2' +} + +``` + +#### emit(eventName, [data]) Trigger an event asynchronously, optionally with some data. Listeners are called in the order they were added, but execute concurrently. @@ -93,6 +140,37 @@ Returns a method to unsubscribe. Remove an `onAny` subscription. +#### anyEvent() + +Get an async iterator which buffers a tuple of an event name and data each time an event is emitted. + +Call `return()` on the iterator to remove the subscription. + +```js +const iterator = emitter.anyEvent(); + +emitter.emit('🦄', '🌈1'); // buffered +emitter.emit('🌟', '🌈2'); // buffered + +iterator.next() + .then( ({value, done}) => { + // done is false + // value is ['🦄', '🌈1'] + return iterator.next(); + }) + .then( ({value, done}) => { + // done is false + // value is ['🌟', '🌈2'] + // revoke subscription + return iterator.return(); + }) + .then(({done}) => { + // done is true + }); +``` + +In the same way as for ``events`` you can subscribe by using the ``for await`` statement + #### clearListeners() Clear all event listeners on the instance. diff --git a/test/index.js b/test/index.js index 1a62cb7..5889c5e 100644 --- a/test/index.js +++ b/test/index.js @@ -2,6 +2,8 @@ import test from 'ava'; import delay from 'delay'; import Emittery from '..'; +const shouldSkip = process.version.startsWith('v8.'); + test('on()', async t => { const emitter = new Emittery(); const calls = []; @@ -55,6 +57,72 @@ test('on() - dedupes identical listeners', async t => { t.deepEqual(calls, [1]); }); +if (!shouldSkip) { + test.serial('events()', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + + await emitter.emit('🦄', '🌈'); + setTimeout(() => { + emitter.emit('🦄', Promise.resolve('🌟')); + }, 10); + + t.plan(3); + const expected = ['🌈', '🌟']; + for await (const data of iterator) { + t.deepEqual(data, expected.shift()); + if (expected.length === 0) { + break; + } + } + + t.deepEqual(await iterator.next(), {done: true}); + }); +} + +test('events() - return() called during emit', async t => { + const emitter = new Emittery(); + let iterator = null; + emitter.on('🦄', () => { + iterator.return(); + }); + iterator = emitter.events('🦄'); + emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: false, value: '🌈'}); + t.deepEqual(await iterator.next(), {done: true}); +}); + +test('events() - return() awaits its argument', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + t.deepEqual(await iterator.return(Promise.resolve(1)), {done: true, value: 1}); +}); + +test('events() - return() without argument', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + t.deepEqual(await iterator.return(), {done: true}); +}); + +test('events() - discarded iterators should stop receiving events', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + + await emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {value: '🌈', done: false}); + await iterator.return(); + await emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: true}); + + setTimeout(() => { + emitter.emit('🦄', '🌟'); + }, 10); + + await new Promise(resolve => setTimeout(resolve, 20)); + + t.deepEqual(await iterator.next(), {done: true}); +}); + test('off()', async t => { const emitter = new Emittery(); const calls = []; @@ -332,6 +400,60 @@ test('onAny() - must have a listener', t => { }, TypeError); }); +if (!shouldSkip) { + test.serial('anyEvent()', async t => { + const emitter = new Emittery(); + const iterator = emitter.anyEvent(); + + await emitter.emit('🦄', '🌈'); + setTimeout(() => { + emitter.emit('🦄', Promise.resolve('🌟')); + }, 10); + + t.plan(3); + const expected = [['🦄', '🌈'], ['🦄', '🌟']]; + for await (const data of iterator) { + t.deepEqual(data, expected.shift()); + if (expected.length === 0) { + break; + } + } + + t.deepEqual(await iterator.next(), {done: true}); + }); +} + +test('anyEvent() - return() called during emit', async t => { + const emitter = new Emittery(); + let iterator = null; + emitter.onAny(() => { + iterator.return(); + }); + iterator = emitter.anyEvent(); + emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: false, value: ['🦄', '🌈']}); + t.deepEqual(await iterator.next(), {done: true}); +}); + +test('anyEvents() - discarded iterators should stop receiving events', async t => { + const emitter = new Emittery(); + const iterator = emitter.anyEvent(); + + await emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {value: ['🦄', '🌈'], done: false}); + await iterator.return(); + await emitter.emit('🦄', '🌈'); + t.deepEqual(await iterator.next(), {done: true}); + + setTimeout(() => { + emitter.emit('🦄', '🌟'); + }, 10); + + await new Promise(resolve => setTimeout(resolve, 20)); + + t.deepEqual(await iterator.next(), {done: true}); +}); + test('offAny()', async t => { const emitter = new Emittery(); const calls = []; @@ -369,6 +491,24 @@ test('clearListeners()', async t => { t.deepEqual(calls, ['🦄1', '🦄2', 'any1', 'any2', '🌈', 'any1', 'any2']); }); +test('clearListeners() - also clears iterators', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + const anyIterator = emitter.anyEvent(); + await emitter.emit('🦄', '🌟'); + await emitter.emit('🌈', '🌟'); + t.deepEqual(await iterator.next(), {done: false, value: '🌟'}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '🌟']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '🌟']}); + await emitter.emit('🦄', '💫'); + emitter.clearListeners(); + await emitter.emit('🌈', '💫'); + t.deepEqual(await iterator.next(), {done: false, value: '💫'}); + t.deepEqual(await iterator.next(), {done: true}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '💫']}); + t.deepEqual(await anyIterator.next(), {done: true}); +}); + test('clearListeners() - with event name', async t => { const emitter = new Emittery(); const calls = []; @@ -386,6 +526,24 @@ test('clearListeners() - with event name', async t => { t.deepEqual(calls, ['🦄1', '🦄2', 'any1', 'any2', '🌈', 'any1', 'any2', 'any1', 'any2', '🌈', 'any1', 'any2']); }); +test('clearListeners() - with event name - clears iterators for that event', async t => { + const emitter = new Emittery(); + const iterator = emitter.events('🦄'); + const anyIterator = emitter.anyEvent(); + await emitter.emit('🦄', '🌟'); + await emitter.emit('🌈', '🌟'); + t.deepEqual(await iterator.next(), {done: false, value: '🌟'}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '🌟']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '🌟']}); + await emitter.emit('🦄', '💫'); + emitter.clearListeners('🦄'); + await emitter.emit('🌈', '💫'); + t.deepEqual(await iterator.next(), {done: false, value: '💫'}); + t.deepEqual(await iterator.next(), {done: true}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🦄', '💫']}); + t.deepEqual(await anyIterator.next(), {done: false, value: ['🌈', '💫']}); +}); + test('listenerCount()', t => { const emitter = new Emittery(); emitter.on('🦄', () => {}); @@ -457,7 +615,7 @@ test('bindMethods() - methodNames must be array of strings or undefined', t => { }); test('bindMethods() - must bind all methods if no array supplied', t => { - const methodsExpected = ['on', 'off', 'once', 'emit', 'emitSerial', 'onAny', 'offAny', 'clearListeners', 'listenerCount', 'bindMethods']; + const methodsExpected = ['on', 'off', 'once', 'events', 'emit', 'emitSerial', 'onAny', 'anyEvent', 'offAny', 'clearListeners', 'listenerCount', 'bindMethods']; const emitter = new Emittery(); const target = {}; @@ -501,6 +659,7 @@ test('mixin()', t => { this.v = v; } } + const TestClassWithMixin = Emittery.mixin('emitter', ['on', 'off', 'once', 'emit', 'emitSerial', 'onAny', 'offAny', 'clearListeners', 'listenerCount', 'bindMethods'])(TestClass); const symbol = Symbol('test symbol'); const instance = new TestClassWithMixin(symbol); @@ -512,8 +671,8 @@ test('mixin()', t => { }); test('mixin() - methodNames must be array of strings or undefined', t => { - class TestClass { - } + class TestClass {} + t.throws(() => Emittery.mixin('emitter', null)(TestClass)); t.throws(() => Emittery.mixin('emitter', 'string')(TestClass)); t.throws(() => Emittery.mixin('emitter', {})(TestClass)); @@ -523,9 +682,10 @@ test('mixin() - methodNames must be array of strings or undefined', t => { }); test('mixin() - must mixin all methods if no array supplied', t => { - const methodsExpected = ['on', 'off', 'once', 'emit', 'emitSerial', 'onAny', 'offAny', 'clearListeners', 'listenerCount', 'bindMethods']; + const methodsExpected = ['on', 'off', 'once', 'events', 'emit', 'emitSerial', 'onAny', 'anyEvent', 'offAny', 'clearListeners', 'listenerCount', 'bindMethods']; class TestClass {} + const TestClassWithMixin = Emittery.mixin('emitter')(TestClass); t.deepEqual(Object.getOwnPropertyNames(TestClassWithMixin.prototype).sort(), methodsExpected.concat(['constructor', 'emitter']).sort()); @@ -533,6 +693,7 @@ test('mixin() - must mixin all methods if no array supplied', t => { test('mixin() - methodNames must only include Emittery methods', t => { class TestClass {} + t.throws(() => Emittery.mixin('emitter', ['nonexistent'])(TestClass)); });