-
Notifications
You must be signed in to change notification settings - Fork 79
/
queue.ts
90 lines (78 loc) 路 2.21 KB
/
queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import {EventEmitter} from 'node:events';
export type QueueOptions = {
concurrency: number;
};
export type QueueItemOptions = {
delay?: number;
};
type QueueItem = {
fn: AsyncFunction;
timeToRun: number;
};
export type AsyncFunction = () => Promise<void>;
export class Queue extends EventEmitter {
private readonly q: QueueItem[] = [];
private activeFunctions = 0;
private readonly concurrency: number;
constructor(options: QueueOptions) {
super();
this.concurrency = options.concurrency;
// It was noticed in test that setTimeout() could sometimes trigger an event
// moments before it was scheduled. This leads to a delta between timeToRun
// and Date.now(), and a link may never crawl. This setInterval() ensures
// these items are eventually processed.
setInterval(() => {
if (this.activeFunctions === 0) this.tick();
}, 2500).unref();
}
on(event: 'done', listener: () => void): this;
on(event: string | symbol, listener: (...arguments_: any[]) => void): this {
return super.on(event, listener);
}
add(function_: AsyncFunction, options?: QueueItemOptions) {
const delay = options?.delay || 0;
const timeToRun = Date.now() + delay;
this.q.push({
fn: function_,
timeToRun,
});
setTimeout(() => {
this.tick();
}, delay);
}
async onIdle() {
return new Promise<void>((resolve) => {
this.on('done', () => {
resolve();
});
});
}
private tick() {
// Check if we're complete
if (this.activeFunctions === 0 && this.q.length === 0) {
this.emit('done');
return;
}
// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < this.q.length; i++) {
// Check if we have too many concurrent functions executing
if (this.activeFunctions >= this.concurrency) {
return;
}
// Grab the element at the front of the array
const item = this.q.shift()!;
// Make sure this element is ready to execute - if not, to the back of the stack
if (item.timeToRun <= Date.now()) {
// This function is ready to go!
this.activeFunctions++;
// eslint-disable-next-line @typescript-eslint/no-floating-promises
item.fn().finally(() => {
this.activeFunctions--;
this.tick();
});
} else {
this.q.push(item);
}
}
}
}