Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscription mechanism using async iterators #40

Merged
merged 21 commits into from Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 112 additions & 1 deletion index.js
Expand Up @@ -2,6 +2,8 @@

const anyMap = new WeakMap();
const eventsMap = new WeakMap();
const producersMap = new WeakMap();
const anyProducer = Symbol('anyProducer');
const resolvedPromise = Promise.resolve();

function assertEventName(eventName) {
Expand All @@ -25,6 +27,82 @@ function getListeners(instance, eventName) {
return events.get(eventName);
}

function getEventProducers(instance, eventName) {
const key = typeof eventName === 'string' ? eventName : anyProducer;
const producers = producersMap.get(instance);
if (!producers.has(key)) {
producers.set(key, new Set());
}

return producers.get(key);
}

function enqueueProducers(instance, eventName, eventData) {
const producers = producersMap.get(instance);
if (producers.has(eventName)) {
for (const producer of producers.get(eventName)) {
producer.enqueue(eventData);
}
}

if (producers.has(anyProducer)) {
const item = Promise.all([eventName, eventData]);
for (const producer of producers.get(anyProducer)) {
producer.enqueue(item);
}
}
}

function iterator(instance, eventName) {
let finished = false;
let flush = () => {};
let queue = [];
const producer = {
enqueue(item) {
queue.push(item);
flush();
},
finish() {
finished = true;
flush();
}
};

getEventProducers(instance, eventName).add(producer);
return {
async next() {
if (!queue) {
return {done: true};
}

if (queue.length === 0) {
if (finished) {
queue = null;
return this.next();
}

await new Promise(resolve => {
flush = resolve;
});
return this.next();
}

return {done: false, value: await queue.shift()};
},
async return(value) {
queue = null;
getEventProducers(instance, eventName).delete(producer);
flush();
return arguments.length > 0 ?
{done: true, value: await value} :
{done: true};
},
[Symbol.asyncIterator]() {
return this;
}
};
}

function defaultMethodNamesOrAssert(methodNames) {
if (methodNames === undefined) {
return allEmitteryMethods;
Expand Down Expand Up @@ -92,6 +170,7 @@ class Emittery {
constructor() {
anyMap.set(this, new Set());
eventsMap.set(this, new Map());
producersMap.set(this, new Map());
}

on(eventName, listener) {
Expand All @@ -117,9 +196,16 @@ class Emittery {
});
}

events(eventName) {
assertEventName(eventName);
return iterator(this, eventName);
}

async emit(eventName, eventData) {
assertEventName(eventName);

enqueueProducers(this, eventName, eventData);

const listeners = getListeners(this, eventName);
const anyListeners = anyMap.get(this);
const staticListeners = [...listeners];
Expand Down Expand Up @@ -170,6 +256,10 @@ class Emittery {
return this.offAny.bind(this, listener);
}

anyEvent() {
return iterator(this);
}

offAny(listener) {
assertListener(listener);
anyMap.get(this).delete(listener);
Expand All @@ -178,17 +268,34 @@ class Emittery {
clearListeners(eventName) {
if (typeof eventName === 'string') {
getListeners(this, eventName).clear();
const producers = getEventProducers(this, eventName);

for (const producer of producers) {
producer.finish();
}

producers.clear();
} else {
anyMap.get(this).clear();

for (const listeners of eventsMap.get(this).values()) {
listeners.clear();
}

for (const producers of producersMap.get(this).values()) {
for (const producer of producers) {
producer.finish();
}

producers.clear();
}
}
}

listenerCount(eventName) {
if (typeof eventName === 'string') {
return anyMap.get(this).size + getListeners(this, eventName).size;
return anyMap.get(this).size + getListeners(this, eventName).size +
getEventProducers(this, eventName).size + getEventProducers(this).size;
}

if (typeof eventName !== 'undefined') {
Expand All @@ -201,6 +308,10 @@ class Emittery {
count += value.size;
}

for (const value of producersMap.get(this).values()) {
count += value.size;
}

return count;
}

Expand Down
13 changes: 12 additions & 1 deletion readme.md
Expand Up @@ -68,8 +68,13 @@ emitter.once('🦄').then(data => {

emitter.emit('🦄', '🌈');
```
#### events(eventName)

#### emit(eventName, data?)
Get an asynchronous iterator which buffers data each time an event is emitted.
sindresorhus marked this conversation as resolved.
Show resolved Hide resolved

Call `return()` on the iterator to remove the subscription.

#### emit(eventName, [data])

Trigger an event asynchronously, optionally with some data. Listeners are called in the order they were added, but execute concurrently.

Expand All @@ -93,6 +98,12 @@ Returns a method to unsubscribe.

Remove an `onAny` subscription.

#### anyEvent()

Get an asynchronous iterator which buffers a tuple of an event name and data each time an event is emitted.

Call `return()` on the iterator to remove the subscription.
sindresorhus marked this conversation as resolved.
Show resolved Hide resolved

#### clearListeners()

Clear all event listeners on the instance.
Expand Down