Skip to content

Commit

Permalink
Add subscription mechanism using async iterators (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
lorenzofox3 authored and sindresorhus committed Sep 17, 2019
1 parent 96a2cb8 commit 2baf580
Show file tree
Hide file tree
Showing 4 changed files with 442 additions and 6 deletions.
86 changes: 86 additions & 0 deletions index.d.ts
Expand Up @@ -26,6 +26,55 @@ declare class Emittery {
*/
on(eventName: string, listener: (eventData?: any) => any): Emittery.UnsubscribeFn;

/**
* Get an async iterator which buffers data each time an event is emitted.
*
* Call `return()` on the iterator to remove the subscription.
*
* @example
* ```
* const iterator = emitter.events('πŸ¦„');
*
* emitter.emit('πŸ¦„', '🌈1'); // buffered
* emitter.emit('πŸ¦„', '🌈2'); // buffered
*
* iterator
* .next()
* .then( ({value, done}) => {
* // done is false
* // value === '🌈1'
* return iterator.next();
* })
* .then( ({value, done}) => {
* // done is false
* // value === '🌈2'
* // revoke subscription
* return iterator.return();
* })
* .then(({done}) => {
* // done is true
* });
* ```
*
* In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement.
* In that case, to revoke the subscription simply break the loop
*
* @example
* ```
* // in an async context
* const iterator = emitter.events('πŸ¦„');
*
* emitter.emit('πŸ¦„', '🌈1'); // buffered
* emitter.emit('πŸ¦„', '🌈2'); // buffered
*
* for await (const data of iterator){
* if(data === '🌈2')
* break; // revoke the subscription when we see the value '🌈2'
* }
* ```
*/
events(eventName:string): AsyncIterableIterator<any>

/**
* Remove an event subscription.
*/
Expand Down Expand Up @@ -74,6 +123,39 @@ declare class Emittery {
*/
onAny(listener: (eventName: string, eventData?: any) => any): Emittery.UnsubscribeFn;

/*
* Get an async iterator which buffers a tuple of an event name and data each time an event is emitted.
*
* Call `return()` on the iterator to remove the subscription.
*
* @example
* ```
* const iterator = emitter.anyEvent();
*
* emitter.emit('πŸ¦„', '🌈1'); // buffered
* emitter.emit('🌟', '🌈2'); // buffered
*
* iterator.next()
* .then( ({value, done}) => {
* // done is false
* // value is ['πŸ¦„', '🌈1']
* return iterator.next();
* })
* .then( ({value, done}) => {
* // done is false
* // value is ['🌟', '🌈2']
* // revoke subscription
* return iterator.return();
* })
* .then(({done}) => {
* // done is true
* });
* ```
*
* In the same way as for ``events`` you can subscribe by using the ``for await`` statement
*/
anyEvent(): AsyncIterableIterator<any>

/**
* Remove an `onAny` subscription.
*/
Expand Down Expand Up @@ -144,13 +226,17 @@ declare namespace Emittery {
on<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, listener: (eventData: EventDataMap[Name]) => any): Emittery.UnsubscribeFn;
on<Name extends EmptyEvents>(eventName: Name, listener: () => any): Emittery.UnsubscribeFn;

events<Name extends Extract<keyof EventDataMap, string>>(eventName: Name): AsyncIterableIterator<EventDataMap[Name]>;

once<Name extends Extract<keyof EventDataMap, string>>(eventName: Name): Promise<EventDataMap[Name]>;
once<Name extends EmptyEvents>(eventName: Name): Promise<void>;

off<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, listener: (eventData: EventDataMap[Name]) => any): void;
off<Name extends EmptyEvents>(eventName: Name, listener: () => any): void;

onAny(listener: (eventName: Extract<keyof EventDataMap, string> | EmptyEvents, eventData?: EventDataMap[Extract<keyof EventDataMap, string>]) => any): Emittery.UnsubscribeFn;
anyEvent(): AsyncIterableIterator<[Extract<keyof EventDataMap, string>, EventDataMap[Extract<keyof EventDataMap, string>]]>;

offAny(listener: (eventName: Extract<keyof EventDataMap, string> | EmptyEvents, eventData?: EventDataMap[Extract<keyof EventDataMap, string>]) => any): void;

emit<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, eventData: EventDataMap[Name]): Promise<void>;
Expand Down
113 changes: 112 additions & 1 deletion index.js
Expand Up @@ -2,6 +2,8 @@

const anyMap = new WeakMap();
const eventsMap = new WeakMap();
const producersMap = new WeakMap();
const anyProducer = Symbol('anyProducer');
const resolvedPromise = Promise.resolve();

function assertEventName(eventName) {
Expand All @@ -25,6 +27,82 @@ function getListeners(instance, eventName) {
return events.get(eventName);
}

function getEventProducers(instance, eventName) {
const key = typeof eventName === 'string' ? eventName : anyProducer;
const producers = producersMap.get(instance);
if (!producers.has(key)) {
producers.set(key, new Set());
}

return producers.get(key);
}

function enqueueProducers(instance, eventName, eventData) {
const producers = producersMap.get(instance);
if (producers.has(eventName)) {
for (const producer of producers.get(eventName)) {
producer.enqueue(eventData);
}
}

if (producers.has(anyProducer)) {
const item = Promise.all([eventName, eventData]);
for (const producer of producers.get(anyProducer)) {
producer.enqueue(item);
}
}
}

function iterator(instance, eventName) {
let finished = false;
let flush = () => {};
let queue = [];
const producer = {
enqueue(item) {
queue.push(item);
flush();
},
finish() {
finished = true;
flush();
}
};

getEventProducers(instance, eventName).add(producer);
return {
async next() {
if (!queue) {
return {done: true};
}

if (queue.length === 0) {
if (finished) {
queue = null;
return this.next();
}

await new Promise(resolve => {
flush = resolve;
});
return this.next();
}

return {done: false, value: await queue.shift()};
},
async return(value) {
queue = null;
getEventProducers(instance, eventName).delete(producer);
flush();
return arguments.length > 0 ?
{done: true, value: await value} :
{done: true};
},
[Symbol.asyncIterator]() {
return this;
}
};
}

function defaultMethodNamesOrAssert(methodNames) {
if (methodNames === undefined) {
return allEmitteryMethods;
Expand Down Expand Up @@ -92,6 +170,7 @@ class Emittery {
constructor() {
anyMap.set(this, new Set());
eventsMap.set(this, new Map());
producersMap.set(this, new Map());
}

on(eventName, listener) {
Expand All @@ -117,9 +196,16 @@ class Emittery {
});
}

events(eventName) {
assertEventName(eventName);
return iterator(this, eventName);
}

async emit(eventName, eventData) {
assertEventName(eventName);

enqueueProducers(this, eventName, eventData);

const listeners = getListeners(this, eventName);
const anyListeners = anyMap.get(this);
const staticListeners = [...listeners];
Expand Down Expand Up @@ -170,6 +256,10 @@ class Emittery {
return this.offAny.bind(this, listener);
}

anyEvent() {
return iterator(this);
}

offAny(listener) {
assertListener(listener);
anyMap.get(this).delete(listener);
Expand All @@ -178,17 +268,34 @@ class Emittery {
clearListeners(eventName) {
if (typeof eventName === 'string') {
getListeners(this, eventName).clear();
const producers = getEventProducers(this, eventName);

for (const producer of producers) {
producer.finish();
}

producers.clear();
} else {
anyMap.get(this).clear();

for (const listeners of eventsMap.get(this).values()) {
listeners.clear();
}

for (const producers of producersMap.get(this).values()) {
for (const producer of producers) {
producer.finish();
}

producers.clear();
}
}
}

listenerCount(eventName) {
if (typeof eventName === 'string') {
return anyMap.get(this).size + getListeners(this, eventName).size;
return anyMap.get(this).size + getListeners(this, eventName).size +
getEventProducers(this, eventName).size + getEventProducers(this).size;
}

if (typeof eventName !== 'undefined') {
Expand All @@ -201,6 +308,10 @@ class Emittery {
count += value.size;
}

for (const value of producersMap.get(this).values()) {
count += value.size;
}

return count;
}

Expand Down
80 changes: 79 additions & 1 deletion readme.md
Expand Up @@ -69,7 +69,54 @@ emitter.once('πŸ¦„').then(data => {
emitter.emit('πŸ¦„', '🌈');
```

#### emit(eventName, data?)
#### events(eventName)

Get an async iterator which buffers data each time an event is emitted.

Call `return()` on the iterator to remove the subscription.

```js
const iterator = emitter.events('πŸ¦„');

emitter.emit('πŸ¦„', '🌈1'); // buffered
emitter.emit('πŸ¦„', '🌈2'); // buffered

iterator
.next()
.then( ({value, done}) => {
// done is false
// value === '🌈1'
return iterator.next();
})
.then( ({value, done}) => {
// done is false
// value === '🌈2'
// revoke subscription
return iterator.return();
})
.then(({done}) => {
// done is true
});
```

In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement.
In that case, to revoke the subscription simply break the loop

```js
// in an async context
const iterator = emitter.events('πŸ¦„');

emitter.emit('πŸ¦„', '🌈1'); // buffered
emitter.emit('πŸ¦„', '🌈2'); // buffered

for await (const data of iterator){
if(data === '🌈2')
break; // revoke the subscription when we see the value '🌈2'
}

```

#### emit(eventName, [data])

Trigger an event asynchronously, optionally with some data. Listeners are called in the order they were added, but execute concurrently.

Expand All @@ -93,6 +140,37 @@ Returns a method to unsubscribe.

Remove an `onAny` subscription.

#### anyEvent()

Get an async iterator which buffers a tuple of an event name and data each time an event is emitted.

Call `return()` on the iterator to remove the subscription.

```js
const iterator = emitter.anyEvent();

emitter.emit('πŸ¦„', '🌈1'); // buffered
emitter.emit('🌟', '🌈2'); // buffered

iterator.next()
.then( ({value, done}) => {
// done is false
// value is ['πŸ¦„', '🌈1']
return iterator.next();
})
.then( ({value, done}) => {
// done is false
// value is ['🌟', '🌈2']
// revoke subscription
return iterator.return();
})
.then(({done}) => {
// done is true
});
```

In the same way as for ``events`` you can subscribe by using the ``for await`` statement

#### clearListeners()

Clear all event listeners on the instance.
Expand Down

0 comments on commit 2baf580

Please sign in to comment.