-
-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
SequentialHandler.ts
509 lines (454 loc) · 17.2 KB
/
SequentialHandler.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
import { setTimeout, clearTimeout } from 'node:timers';
import { setTimeout as sleep } from 'node:timers/promises';
import { AsyncQueue } from '@sapphire/async-queue';
import { request, type Dispatcher } from 'undici';
import type { RateLimitData, RequestOptions } from '../REST';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager';
import { DiscordAPIError, type DiscordErrorData, type OAuthErrorData } from '../errors/DiscordAPIError.js';
import { HTTPError } from '../errors/HTTPError.js';
import { RateLimitError } from '../errors/RateLimitError.js';
import { RESTEvents } from '../utils/constants.js';
import { hasSublimit, parseHeader, parseResponse } from '../utils/utils.js';
import type { IHandler } from './IHandler.js';
/**
* Invalid request limiting is done on a per-IP basis, not a per-token basis.
* The best we can do is track invalid counts process-wide (on the theory that
* users could have multiple bots run from one process) rather than per-bot.
* Therefore, store these at file scope here rather than in the client's
* RESTManager object.
*/
let invalidCount = 0;
let invalidCountResetTime: number | null = null;
const enum QueueType {
Standard,
Sublimit,
}
/**
* The structure used to handle requests for a given bucket
*/
export class SequentialHandler implements IHandler {
/**
* {@inheritDoc IHandler.id}
*/
public readonly id: string;
/**
* The time this rate limit bucket will reset
*/
private reset = -1;
/**
* The remaining requests that can be made before we are rate limited
*/
private remaining = 1;
/**
* The total number of requests that can be made before we are rate limited
*/
private limit = Number.POSITIVE_INFINITY;
/**
* The interface used to sequence async requests sequentially
*/
#asyncQueue = new AsyncQueue();
/**
* The interface used to sequence sublimited async requests sequentially
*/
#sublimitedQueue: AsyncQueue | null = null;
/**
* A promise wrapper for when the sublimited queue is finished being processed or null when not being processed
*/
#sublimitPromise: { promise: Promise<void>; resolve(): void } | null = null;
/**
* Whether the sublimit queue needs to be shifted in the finally block
*/
#shiftSublimit = false;
/**
* @param manager - The request manager
* @param hash - The hash that this RequestHandler handles
* @param majorParameter - The major parameter for this handler
*/
public constructor(
private readonly manager: RequestManager,
private readonly hash: string,
private readonly majorParameter: string,
) {
this.id = `${hash}:${majorParameter}`;
}
/**
* {@inheritDoc IHandler.inactive}
*/
public get inactive(): boolean {
return (
this.#asyncQueue.remaining === 0 &&
(this.#sublimitedQueue === null || this.#sublimitedQueue.remaining === 0) &&
!this.limited
);
}
/**
* If the rate limit bucket is currently limited by the global limit
*/
private get globalLimited(): boolean {
return this.manager.globalRemaining <= 0 && Date.now() < this.manager.globalReset;
}
/**
* If the rate limit bucket is currently limited by its limit
*/
private get localLimited(): boolean {
return this.remaining <= 0 && Date.now() < this.reset;
}
/**
* If the rate limit bucket is currently limited
*/
private get limited(): boolean {
return this.globalLimited || this.localLimited;
}
/**
* The time until queued requests can continue
*/
private get timeToReset(): number {
return this.reset + this.manager.options.offset - Date.now();
}
/**
* Emits a debug message
*
* @param message - The message to debug
*/
private debug(message: string) {
this.manager.emit(RESTEvents.Debug, `[REST ${this.id}] ${message}`);
}
/**
* Delay all requests for the specified amount of time, handling global rate limits
*
* @param time - The amount of time to delay all requests for
*/
private async globalDelayFor(time: number): Promise<void> {
await sleep(time, undefined, { ref: false });
this.manager.globalDelay = null;
}
/*
* Determines whether the request should be queued or whether a RateLimitError should be thrown
*/
private async onRateLimit(rateLimitData: RateLimitData) {
const { options } = this.manager;
if (!options.rejectOnRateLimit) return;
const shouldThrow =
typeof options.rejectOnRateLimit === 'function'
? await options.rejectOnRateLimit(rateLimitData)
: options.rejectOnRateLimit.some((route) => rateLimitData.route.startsWith(route.toLowerCase()));
if (shouldThrow) {
throw new RateLimitError(rateLimitData);
}
}
/**
* {@inheritDoc IHandler.queueRequest}
*/
public async queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData> {
let queue = this.#asyncQueue;
let queueType = QueueType.Standard;
// Separate sublimited requests when already sublimited
if (this.#sublimitedQueue && hasSublimit(routeId.bucketRoute, requestData.body, options.method)) {
queue = this.#sublimitedQueue!;
queueType = QueueType.Sublimit;
}
// Wait for any previous requests to be completed before this one is run
await queue.wait({ signal: requestData.signal });
// This set handles retroactively sublimiting requests
if (queueType === QueueType.Standard) {
if (this.#sublimitedQueue && hasSublimit(routeId.bucketRoute, requestData.body, options.method)) {
/**
* Remove the request from the standard queue, it should never be possible to get here while processing the
* sublimit queue so there is no need to worry about shifting the wrong request
*/
queue = this.#sublimitedQueue!;
const wait = queue.wait();
this.#asyncQueue.shift();
await wait;
} else if (this.#sublimitPromise) {
// Stall requests while the sublimit queue gets processed
await this.#sublimitPromise.promise;
}
}
try {
// Make the request, and return the results
return await this.runRequest(routeId, url, options, requestData);
} finally {
// Allow the next request to fire
queue.shift();
if (this.#shiftSublimit) {
this.#shiftSublimit = false;
this.#sublimitedQueue?.shift();
}
// If this request is the last request in a sublimit
if (this.#sublimitedQueue?.remaining === 0) {
this.#sublimitPromise?.resolve();
this.#sublimitedQueue = null;
}
}
}
/**
* The method that actually makes the request to the api, and updates info about the bucket accordingly
*
* @param routeId - The generalized api route with literal ids for major parameters
* @param url - The fully resolved url to make the request to
* @param options - The fetch options needed to make the request
* @param requestData - Extra data from the user's request needed for errors and additional processing
* @param retries - The number of retries this request has already attempted (recursion)
*/
private async runRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
requestData: HandlerRequestData,
retries = 0,
): Promise<Dispatcher.ResponseData> {
/*
* After calculations have been done, pre-emptively stop further requests
* Potentially loop until this task can run if e.g. the global rate limit is hit twice
*/
while (this.limited) {
const isGlobal = this.globalLimited;
let limit: number;
let timeout: number;
let delay: Promise<void>;
if (isGlobal) {
// Set RateLimitData based on the global limit
limit = this.manager.options.globalRequestsPerSecond;
timeout = this.manager.globalReset + this.manager.options.offset - Date.now();
// If this is the first task to reach the global timeout, set the global delay
if (!this.manager.globalDelay) {
// The global delay function clears the global delay state when it is resolved
this.manager.globalDelay = this.globalDelayFor(timeout);
}
delay = this.manager.globalDelay;
} else {
// Set RateLimitData based on the route-specific limit
limit = this.limit;
timeout = this.timeToReset;
delay = sleep(timeout);
}
const rateLimitData: RateLimitData = {
timeToReset: timeout,
limit,
method: options.method ?? 'get',
hash: this.hash,
url,
route: routeId.bucketRoute,
majorParameter: this.majorParameter,
global: isGlobal,
};
// Let library users know they have hit a rate limit
this.manager.emit(RESTEvents.RateLimited, rateLimitData);
// Determine whether a RateLimitError should be thrown
await this.onRateLimit(rateLimitData);
// When not erroring, emit debug for what is happening
if (isGlobal) {
this.debug(`Global rate limit hit, blocking all requests for ${timeout}ms`);
} else {
this.debug(`Waiting ${timeout}ms for rate limit to pass`);
}
// Wait the remaining time left before the rate limit resets
await delay;
}
// As the request goes out, update the global usage information
if (!this.manager.globalReset || this.manager.globalReset < Date.now()) {
this.manager.globalReset = Date.now() + 1_000;
this.manager.globalRemaining = this.manager.options.globalRequestsPerSecond;
}
this.manager.globalRemaining--;
const method = options.method ?? 'get';
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), this.manager.options.timeout).unref();
if (requestData.signal) {
// The type polyfill is required because Node.js's types are incomplete.
const signal = requestData.signal as PolyFillAbortSignal;
// If the user signal was aborted, abort the controller, else abort the local signal.
// The reason why we don't re-use the user's signal, is because users may use the same signal for multiple
// requests, and we do not want to cause unexpected side-effects.
if (signal.aborted) controller.abort();
else signal.addEventListener('abort', () => controller.abort());
}
let res: Dispatcher.ResponseData;
try {
res = await request(url, { ...options, signal: controller.signal });
} catch (error: unknown) {
// Retry the specified number of times for possible timed out requests
if (error instanceof Error && error.name === 'AbortError' && retries !== this.manager.options.retries) {
// eslint-disable-next-line no-param-reassign
return await this.runRequest(routeId, url, options, requestData, ++retries);
}
throw error;
} finally {
clearTimeout(timeout);
}
if (this.manager.listenerCount(RESTEvents.Response)) {
this.manager.emit(
RESTEvents.Response,
{
method,
path: routeId.original,
route: routeId.bucketRoute,
options,
data: requestData,
retries,
},
{ ...res },
);
}
const status = res.statusCode;
let retryAfter = 0;
const limit = parseHeader(res.headers['x-ratelimit-limit']);
const remaining = parseHeader(res.headers['x-ratelimit-remaining']);
const reset = parseHeader(res.headers['x-ratelimit-reset-after']);
const hash = parseHeader(res.headers['x-ratelimit-bucket']);
const retry = parseHeader(res.headers['retry-after']);
// Update the total number of requests that can be made before the rate limit resets
this.limit = limit ? Number(limit) : Number.POSITIVE_INFINITY;
// Update the number of remaining requests that can be made before the rate limit resets
this.remaining = remaining ? Number(remaining) : 1;
// Update the time when this rate limit resets (reset-after is in seconds)
this.reset = reset ? Number(reset) * 1_000 + Date.now() + this.manager.options.offset : Date.now();
// Amount of time in milliseconds until we should retry if rate limited (globally or otherwise)
if (retry) retryAfter = Number(retry) * 1_000 + this.manager.options.offset;
// Handle buckets via the hash header retroactively
if (hash && hash !== this.hash) {
// Let library users know when rate limit buckets have been updated
this.debug(['Received bucket hash update', ` Old Hash : ${this.hash}`, ` New Hash : ${hash}`].join('\n'));
// This queue will eventually be eliminated via attrition
this.manager.hashes.set(`${method}:${routeId.bucketRoute}`, { value: hash, lastAccess: Date.now() });
} else if (hash) {
// Handle the case where hash value doesn't change
// Fetch the hash data from the manager
const hashData = this.manager.hashes.get(`${method}:${routeId.bucketRoute}`);
// When fetched, update the last access of the hash
if (hashData) {
hashData.lastAccess = Date.now();
}
}
// Handle retryAfter, which means we have actually hit a rate limit
let sublimitTimeout: number | null = null;
if (retryAfter > 0) {
if (res.headers['x-ratelimit-global'] !== undefined) {
this.manager.globalRemaining = 0;
this.manager.globalReset = Date.now() + retryAfter;
} else if (!this.localLimited) {
/*
* This is a sublimit (e.g. 2 channel name changes/10 minutes) since the headers don't indicate a
* route-wide rate limit. Don't update remaining or reset to avoid rate limiting the whole
* endpoint, just set a reset time on the request itself to avoid retrying too soon.
*/
sublimitTimeout = retryAfter;
}
}
// Count the invalid requests
if (status === 401 || status === 403 || status === 429) {
if (!invalidCountResetTime || invalidCountResetTime < Date.now()) {
invalidCountResetTime = Date.now() + 1_000 * 60 * 10;
invalidCount = 0;
}
invalidCount++;
const emitInvalid =
this.manager.options.invalidRequestWarningInterval > 0 &&
invalidCount % this.manager.options.invalidRequestWarningInterval === 0;
if (emitInvalid) {
// Let library users know periodically about invalid requests
this.manager.emit(RESTEvents.InvalidRequestWarning, {
count: invalidCount,
remainingTime: invalidCountResetTime - Date.now(),
});
}
}
if (status >= 200 && status < 300) {
return res;
} else if (status === 429) {
// A rate limit was hit - this may happen if the route isn't associated with an official bucket hash yet, or when first globally rate limited
const isGlobal = this.globalLimited;
let limit: number;
let timeout: number;
if (isGlobal) {
// Set RateLimitData based on the global limit
limit = this.manager.options.globalRequestsPerSecond;
timeout = this.manager.globalReset + this.manager.options.offset - Date.now();
} else {
// Set RateLimitData based on the route-specific limit
limit = this.limit;
timeout = this.timeToReset;
}
await this.onRateLimit({
timeToReset: timeout,
limit,
method,
hash: this.hash,
url,
route: routeId.bucketRoute,
majorParameter: this.majorParameter,
global: isGlobal,
});
this.debug(
[
'Encountered unexpected 429 rate limit',
` Global : ${isGlobal.toString()}`,
` Method : ${method}`,
` URL : ${url}`,
` Bucket : ${routeId.bucketRoute}`,
` Major parameter: ${routeId.majorParameter}`,
` Hash : ${this.hash}`,
` Limit : ${limit}`,
` Retry After : ${retryAfter}ms`,
` Sublimit : ${sublimitTimeout ? `${sublimitTimeout}ms` : 'None'}`,
].join('\n'),
);
// If caused by a sublimit, wait it out here so other requests on the route can be handled
if (sublimitTimeout) {
// Normally the sublimit queue will not exist, however, if a sublimit is hit while in the sublimit queue, it will
const firstSublimit = !this.#sublimitedQueue;
if (firstSublimit) {
this.#sublimitedQueue = new AsyncQueue();
void this.#sublimitedQueue.wait();
this.#asyncQueue.shift();
}
this.#sublimitPromise?.resolve();
this.#sublimitPromise = null;
await sleep(sublimitTimeout, undefined, { ref: false });
let resolve: () => void;
// eslint-disable-next-line promise/param-names, no-promise-executor-return
const promise = new Promise<void>((res) => (resolve = res));
this.#sublimitPromise = { promise, resolve: resolve! };
if (firstSublimit) {
// Re-queue this request so it can be shifted by the finally
await this.#asyncQueue.wait();
this.#shiftSublimit = true;
}
}
// Since this is not a server side issue, the next request should pass, so we don't bump the retries counter
return this.runRequest(routeId, url, options, requestData, retries);
} else if (status >= 500 && status < 600) {
// Retry the specified number of times for possible server side issues
if (retries !== this.manager.options.retries) {
// eslint-disable-next-line no-param-reassign
return this.runRequest(routeId, url, options, requestData, ++retries);
}
// We are out of retries, throw an error
throw new HTTPError(status, method, url, requestData);
} else {
// Handle possible malformed requests
if (status >= 400 && status < 500) {
// If we receive this status code, it means the token we had is no longer valid.
if (status === 401 && requestData.auth) {
this.manager.setToken(null!);
}
// The request will not succeed for some reason, parse the error returned from the api
const data = (await parseResponse(res)) as DiscordErrorData | OAuthErrorData;
// throw the API error
throw new DiscordAPIError(data, 'code' in data ? data.code : data.error, status, method, url, requestData);
}
return res;
}
}
}
interface PolyFillAbortSignal {
readonly aborted: boolean;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
}