/
concurrency.ts
36 lines (34 loc) · 1019 Bytes
/
concurrency.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import {go} from './go';
import type {Code} from './types';
class Task<T = unknown> {
public readonly resolve!: (data: T) => void;
public readonly reject!: (error: any) => void;
public readonly promise = new Promise<T>((resolve, reject) => {
(this as any).resolve = resolve;
(this as any).reject = reject;
});
constructor(public readonly code: Code<T>) {}
}
/** Limits concurrency of async code. */
export const concurrency = (limit: number) => {
let workers = 0;
const queue = new Set<Task>();
const work = async () => {
const task = queue.values().next().value;
if (task) queue.delete(task);
else return;
workers++;
try {
task.resolve(await task.code());
} catch (error) {
task.reject(error);
} finally {
workers--, queue.size && go(work);
}
};
return async <T = unknown>(code: Code<T>): Promise<T> => {
const task = new Task(code);
queue.add(task as Task<unknown>);
return workers < limit && go(work), task.promise;
};
};