Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription mechanism using async iterators #40

Merged
merged 21 commits into from Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 86 additions & 0 deletions index.d.ts
Expand Up @@ -26,6 +26,55 @@ declare class Emittery {
*/
on(eventName: string, listener: (eventData?: any) => any): Emittery.UnsubscribeFn;

/**
* Get an async iterator which buffers data each time an event is emitted.
*
* Call `return()` on the iterator to remove the subscription.
*
* @example
* ```
* const iterator = emitter.events('🦄');
*
* emitter.emit('🦄', '🌈1'); // buffered
* emitter.emit('🦄', '🌈2'); // buffered
*
* iterator
* .next()
* .then( ({value, done}) => {
* // done is false
* // value === '🌈1'
* return iterator.next();
* })
* .then( ({value, done}) => {
* // done is false
* // value === '🌈2'
* // revoke subscription
* return iterator.return();
* })
* .then(({done}) => {
* // done is true
* });
* ```
*
* In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement.
* In that case, to revoke the subscription simply break the loop
*
* @example
* ```
* // in an async context
* const iterator = emitter.events('🦄');
*
* emitter.emit('🦄', '🌈1'); // buffered
* emitter.emit('🦄', '🌈2'); // buffered
*
* for await (const data of iterator){
* if(data === '🌈2')
* break; // revoke the subscription when we see the value '🌈2'
* }
* ```
*/
events(eventName:string): AsyncIterableIterator<any>

/**
* Remove an event subscription.
*/
Expand Down Expand Up @@ -74,6 +123,39 @@ declare class Emittery {
*/
onAny(listener: (eventName: string, eventData?: any) => any): Emittery.UnsubscribeFn;

/*
* Get an async iterator which buffers a tuple of an event name and data each time an event is emitted.
*
* Call `return()` on the iterator to remove the subscription.
*
* @example
* ```
* const iterator = emitter.anyEvent();
*
* emitter.emit('🦄', '🌈1'); // buffered
* emitter.emit('🌟', '🌈2'); // buffered
*
* iterator.next()
* .then( ({value, done}) => {
* // done is false
* // value is ['🦄', '🌈1']
* return iterator.next();
* })
* .then( ({value, done}) => {
* // done is false
* // value is ['🌟', '🌈2']
* // revoke subscription
* return iterator.return();
* })
* .then(({done}) => {
* // done is true
* });
* ```
*
* In the same way as for ``events`` you can subscribe by using the ``for await`` statement
*/
anyEvent(): AsyncIterableIterator<any>

/**
* Remove an `onAny` subscription.
*/
Expand Down Expand Up @@ -144,13 +226,17 @@ declare namespace Emittery {
on<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, listener: (eventData: EventDataMap[Name]) => any): Emittery.UnsubscribeFn;
on<Name extends EmptyEvents>(eventName: Name, listener: () => any): Emittery.UnsubscribeFn;

events<Name extends Extract<keyof EventDataMap, string>>(eventName: Name): AsyncIterableIterator<EventDataMap[Name]>;

once<Name extends Extract<keyof EventDataMap, string>>(eventName: Name): Promise<EventDataMap[Name]>;
once<Name extends EmptyEvents>(eventName: Name): Promise<void>;

off<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, listener: (eventData: EventDataMap[Name]) => any): void;
off<Name extends EmptyEvents>(eventName: Name, listener: () => any): void;

onAny(listener: (eventName: Extract<keyof EventDataMap, string> | EmptyEvents, eventData?: EventDataMap[Extract<keyof EventDataMap, string>]) => any): Emittery.UnsubscribeFn;
anyEvent(): AsyncIterableIterator<[Extract<keyof EventDataMap, string>, EventDataMap[Extract<keyof EventDataMap, string>]]>;

offAny(listener: (eventName: Extract<keyof EventDataMap, string> | EmptyEvents, eventData?: EventDataMap[Extract<keyof EventDataMap, string>]) => any): void;

emit<Name extends Extract<keyof EventDataMap, string>>(eventName: Name, eventData: EventDataMap[Name]): Promise<void>;
Expand Down
113 changes: 112 additions & 1 deletion index.js
Expand Up @@ -2,6 +2,8 @@

const anyMap = new WeakMap();
const eventsMap = new WeakMap();
const producersMap = new WeakMap();
const anyProducer = Symbol('anyProducer');
const resolvedPromise = Promise.resolve();

function assertEventName(eventName) {
Expand All @@ -25,6 +27,82 @@ function getListeners(instance, eventName) {
return events.get(eventName);
}

function getEventProducers(instance, eventName) {
const key = typeof eventName === 'string' ? eventName : anyProducer;
const producers = producersMap.get(instance);
if (!producers.has(key)) {
producers.set(key, new Set());
}

return producers.get(key);
}

function enqueueProducers(instance, eventName, eventData) {
const producers = producersMap.get(instance);
if (producers.has(eventName)) {
for (const producer of producers.get(eventName)) {
producer.enqueue(eventData);
}
}

if (producers.has(anyProducer)) {
const item = Promise.all([eventName, eventData]);
for (const producer of producers.get(anyProducer)) {
producer.enqueue(item);
}
}
}

function iterator(instance, eventName) {
let finished = false;
let flush = () => {};
let queue = [];
const producer = {
enqueue(item) {
queue.push(item);
flush();
},
finish() {
finished = true;
flush();
}
};

getEventProducers(instance, eventName).add(producer);
return {
async next() {
if (!queue) {
return {done: true};
}

if (queue.length === 0) {
if (finished) {
queue = null;
return this.next();
}

await new Promise(resolve => {
flush = resolve;
});
return this.next();
}

return {done: false, value: await queue.shift()};
},
async return(value) {
queue = null;
getEventProducers(instance, eventName).delete(producer);
flush();
return arguments.length > 0 ?
{done: true, value: await value} :
{done: true};
},
[Symbol.asyncIterator]() {
return this;
}
};
}

function defaultMethodNamesOrAssert(methodNames) {
if (methodNames === undefined) {
return allEmitteryMethods;
Expand Down Expand Up @@ -92,6 +170,7 @@ class Emittery {
constructor() {
anyMap.set(this, new Set());
eventsMap.set(this, new Map());
producersMap.set(this, new Map());
}

on(eventName, listener) {
Expand All @@ -117,9 +196,16 @@ class Emittery {
});
}

events(eventName) {
assertEventName(eventName);
return iterator(this, eventName);
}

async emit(eventName, eventData) {
assertEventName(eventName);

enqueueProducers(this, eventName, eventData);

const listeners = getListeners(this, eventName);
const anyListeners = anyMap.get(this);
const staticListeners = [...listeners];
Expand Down Expand Up @@ -170,6 +256,10 @@ class Emittery {
return this.offAny.bind(this, listener);
}

anyEvent() {
return iterator(this);
}

offAny(listener) {
assertListener(listener);
anyMap.get(this).delete(listener);
Expand All @@ -178,17 +268,34 @@ class Emittery {
clearListeners(eventName) {
if (typeof eventName === 'string') {
getListeners(this, eventName).clear();
const producers = getEventProducers(this, eventName);

for (const producer of producers) {
producer.finish();
}

producers.clear();
} else {
anyMap.get(this).clear();

for (const listeners of eventsMap.get(this).values()) {
listeners.clear();
}

for (const producers of producersMap.get(this).values()) {
for (const producer of producers) {
producer.finish();
}

producers.clear();
}
}
}

listenerCount(eventName) {
if (typeof eventName === 'string') {
return anyMap.get(this).size + getListeners(this, eventName).size;
return anyMap.get(this).size + getListeners(this, eventName).size +
getEventProducers(this, eventName).size + getEventProducers(this).size;
}

if (typeof eventName !== 'undefined') {
Expand All @@ -201,6 +308,10 @@ class Emittery {
count += value.size;
}

for (const value of producersMap.get(this).values()) {
count += value.size;
}

return count;
}

Expand Down
80 changes: 79 additions & 1 deletion readme.md
Expand Up @@ -69,7 +69,54 @@ emitter.once('🦄').then(data => {
emitter.emit('🦄', '🌈');
```

#### emit(eventName, data?)
#### events(eventName)

Get an async iterator which buffers data each time an event is emitted.

Call `return()` on the iterator to remove the subscription.

```js
const iterator = emitter.events('🦄');

emitter.emit('🦄', '🌈1'); // buffered
emitter.emit('🦄', '🌈2'); // buffered

iterator
.next()
.then( ({value, done}) => {
// done is false
// value === '🌈1'
return iterator.next();
})
.then( ({value, done}) => {
// done is false
// value === '🌈2'
// revoke subscription
return iterator.return();
})
.then(({done}) => {
// done is true
});
```

In practice you would usually consume the events using the [for await](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) statement.
In that case, to revoke the subscription simply break the loop

```js
// in an async context
const iterator = emitter.events('🦄');

emitter.emit('🦄', '🌈1'); // buffered
emitter.emit('🦄', '🌈2'); // buffered

for await (const data of iterator){
if(data === '🌈2')
break; // revoke the subscription when we see the value '🌈2'
}

```

#### emit(eventName, [data])

Trigger an event asynchronously, optionally with some data. Listeners are called in the order they were added, but execute concurrently.

Expand All @@ -93,6 +140,37 @@ Returns a method to unsubscribe.

Remove an `onAny` subscription.

#### anyEvent()

Get an async iterator which buffers a tuple of an event name and data each time an event is emitted.

Call `return()` on the iterator to remove the subscription.
sindresorhus marked this conversation as resolved.
Show resolved Hide resolved

```js
const iterator = emitter.anyEvent();

emitter.emit('🦄', '🌈1'); // buffered
emitter.emit('🌟', '🌈2'); // buffered

iterator.next()
.then( ({value, done}) => {
// done is false
// value is ['🦄', '🌈1']
return iterator.next();
})
.then( ({value, done}) => {
// done is false
// value is ['🌟', '🌈2']
// revoke subscription
return iterator.return();
})
.then(({done}) => {
// done is true
});
```

In the same way as for ``events`` you can subscribe by using the ``for await`` statement

#### clearListeners()

Clear all event listeners on the instance.
Expand Down