From 114bcc07a980325a9c3a04eb35ce1284b23eaefb Mon Sep 17 00:00:00 2001 From: Voxelli <69213593+legendhimslef@users.noreply.github.com> Date: Sun, 5 Jun 2022 13:08:31 +0530 Subject: [PATCH] fix(websocketshard): deal with zombie connection caused by 4009 (#7581) Co-authored-by: Almeida Co-authored-by: Vitor Co-authored-by: SpaceEEC Co-authored-by: Vlad Frangu Co-authored-by: Jiralite <33201955+Jiralite@users.noreply.github.com> Co-authored-by: Parbez --- src/client/websocket/WebSocketManager.js | 11 +- src/client/websocket/WebSocketShard.js | 128 +++++++++++++++++++---- src/util/Constants.js | 1 + src/util/Options.js | 3 + typings/index.d.ts | 8 ++ 5 files changed, 125 insertions(+), 26 deletions(-) diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index 65a23f96e338..8838489cc110 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -20,7 +20,7 @@ const BeforeReadyWhitelist = [ WSEvents.GUILD_MEMBER_REMOVE, ]; -const UNRECOVERABLE_CLOSE_CODES = Object.keys(WSCodes).slice(1).map(Number); +const UNRECOVERABLE_CLOSE_CODES = Object.keys(WSCodes).slice(2).map(Number); const UNRESUMABLE_CLOSE_CODES = [ RPCErrorCodes.UnknownError, RPCErrorCodes.InvalidPermissions, @@ -216,13 +216,8 @@ class WebSocketManager extends EventEmitter { this.shardQueue.add(shard); - if (shard.sessionId) { - this.debug(`Session id is present, attempting an immediate reconnect...`, shard); - this.reconnect(); - } else { - shard.destroy({ reset: true, emit: false, log: false }); - this.reconnect(); - } + if (shard.sessionId) this.debug(`Session id is present, attempting an immediate reconnect...`, shard); + this.reconnect(); }); shard.on(ShardEvents.INVALID_SESSION, () => { diff --git a/src/client/websocket/WebSocketShard.js b/src/client/websocket/WebSocketShard.js index 35174f6d88ff..43faa8ee85c7 100644 --- a/src/client/websocket/WebSocketShard.js +++ b/src/client/websocket/WebSocketShard.js @@ -1,9 +1,9 @@ 'use strict'; const EventEmitter = require('node:events'); -const { setTimeout, setInterval } = require('node:timers'); +const { setTimeout, setInterval, clearTimeout } = require('node:timers'); const WebSocket = require('../../WebSocket'); -const { Status, Events, ShardEvents, Opcodes, WSEvents } = require('../../util/Constants'); +const { Status, Events, ShardEvents, Opcodes, WSEvents, WSCodes } = require('../../util/Constants'); const Intents = require('../../util/Intents'); const STATUS_KEYS = Object.keys(Status); @@ -81,6 +81,13 @@ class WebSocketShard extends EventEmitter { */ this.lastHeartbeatAcked = true; + /** + * Used to prevent calling {@link WebSocketShard#event:close} twice while closing or terminating the WebSocket. + * @type {boolean} + * @private + */ + this.closeEmitted = false; + /** * Contains the rate limit queue and metadata * @name WebSocketShard#ratelimit @@ -126,6 +133,14 @@ class WebSocketShard extends EventEmitter { */ Object.defineProperty(this, 'helloTimeout', { value: null, writable: true }); + /** + * The WebSocket timeout. + * @name WebSocketShard#wsCloseTimeout + * @type {?NodeJS.Timeout} + * @private + */ + Object.defineProperty(this, 'wsCloseTimeout', { value: null, writable: true }); + /** * If the manager attached its event handlers on the shard * @name WebSocketShard#eventsAttached @@ -250,10 +265,11 @@ class WebSocketShard extends EventEmitter { this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING; this.setHelloTimeout(); - + this.setWsCloseTimeout(-1); this.connectedAt = Date.now(); - const ws = (this.connection = WebSocket.create(gateway, wsQuery)); + // Adding a handshake timeout to just make sure no zombie connection appears. + const ws = (this.connection = WebSocket.create(gateway, wsQuery, { handshakeTimeout: 30_000 })); ws.onopen = this.onOpen.bind(this); ws.onmessage = this.onMessage.bind(this); ws.onerror = this.onError.bind(this); @@ -340,21 +356,39 @@ class WebSocketShard extends EventEmitter { * @private */ onClose(event) { + this.closeEmitted = true; if (this.sequence !== -1) this.closeSequence = this.sequence; this.sequence = -1; - - this.debug(`[CLOSE] - Event Code: ${event.code} - Clean : ${event.wasClean} - Reason : ${event.reason ?? 'No reason received'}`); - this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); + // Clearing the WebSocket close timeout as close was emitted. + this.setWsCloseTimeout(-1); // If we still have a connection object, clean up its listeners - if (this.connection) this._cleanupConnection(); - + if (this.connection) { + this._cleanupConnection(); + // Having this after _cleanupConnection to just clean up the connection and not listen to ws.onclose + this.destroy({ reset: true, emit: false, log: false }); + } this.status = Status.DISCONNECTED; + this.emitClose(event); + } + /** + * This method is responsible to emit close event for this shard. + * This method helps the shard reconnect. + * @param {CloseEvent} [event] Close event that was received + */ + emitClose( + event = { + code: 1011, + reason: WSCodes[1011], + wasClean: false, + }, + ) { + this.debug(`[CLOSE] + Event Code: ${event.code} + Clean : ${event.wasClean} + Reason : ${event.reason ?? 'No reason received'}`); /** * Emitted when a shard's WebSocket closes. * @private @@ -523,6 +557,47 @@ class WebSocketShard extends EventEmitter { }, 20_000).unref(); } + /** + * Sets the WebSocket Close timeout. + * This method is responsible for detecting any zombie connections if the WebSocket fails to close properly. + * @param {number} [time] If set to -1, it will clear the timeout + * @private + */ + setWsCloseTimeout(time) { + if (this.wsCloseTimeout) { + this.debug('[WebSocket] Clearing the close timeout.'); + clearTimeout(this.wsCloseTimeout); + } + if (time === -1) { + this.wsCloseTimeout = null; + return; + } + this.wsCloseTimeout = setTimeout(() => { + this.setWsCloseTimeout(-1); + this.debug(`[WebSocket] Close Emitted: ${this.closeEmitted}`); + // Check if close event was emitted. + if (this.closeEmitted) { + this.debug( + `[WebSocket] was closed. | WS State: ${ + CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED] + } | Close Emitted: ${this.closeEmitted}`, + ); + // Setting the variable false to check for zombie connections. + this.closeEmitted = false; + return; + } + + this.debug( + // eslint-disable-next-line max-len + `[WebSocket] did not close properly, assuming a zombie connection.\nEmitting close and reconnecting again.`, + ); + + this.emitClose(); + // Setting the variable false to check for zombie connections. + this.closeEmitted = false; + }, time).unref(); + } + /** * Sets the heartbeat timer for this shard. * @param {number} time If -1, clears the interval, any other number sets an interval @@ -563,8 +638,7 @@ class WebSocketShard extends EventEmitter { Sequence : ${this.sequence} Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`, ); - - this.destroy({ closeCode: 4009, reset: true }); + this.destroy({ reset: true, closeCode: 4009 }); return; } @@ -713,21 +787,30 @@ class WebSocketShard extends EventEmitter { this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); + this.debug( + `[WebSocket] Destroy: Attempting to close the WebSocket. | WS State: ${ + CONNECTION_STATE[this.connection?.readyState ?? WebSocket.CLOSED] + }`, + ); // Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED if (this.connection) { // If the connection is currently opened, we will (hopefully) receive close if (this.connection.readyState === WebSocket.OPEN) { this.connection.close(closeCode); + this.debug(`[WebSocket] Close: Tried closing. | WS State: ${CONNECTION_STATE[this.connection.readyState]}`); } else { // Connection is not OPEN this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`); - // Remove listeners from the connection - this._cleanupConnection(); // Attempt to close the connection just in case try { this.connection.close(closeCode); - } catch { - // No-op + } catch (err) { + this.debug( + `[WebSocket] Close: Something went wrong while closing the WebSocket: ${ + err.message || err + }. Forcefully terminating the connection | WS State: ${CONNECTION_STATE[this.connection.readyState]}`, + ); + this.connection.terminate(); } // Emit the destroyed event if needed if (emit) this._emitDestroyed(); @@ -737,6 +820,15 @@ class WebSocketShard extends EventEmitter { this._emitDestroyed(); } + if (this.connection?.readyState === WebSocket.CLOSING || this.connection?.readyState === WebSocket.CLOSED) { + this.closeEmitted = false; + this.debug( + `[WebSocket] Adding a WebSocket close timeout to ensure a correct WS reconnect. + Timeout: ${this.manager.client.options.closeTimeout}ms`, + ); + this.setWsCloseTimeout(this.manager.client.options.closeTimeout); + } + // Step 2: Null the connection object this.connection = null; diff --git a/src/util/Constants.js b/src/util/Constants.js index 417157e6d429..dc24eaa2cc19 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -8,6 +8,7 @@ exports.UserAgent = `DiscordBot (${Package.homepage}, ${Package.version}) Node.j exports.WSCodes = { 1000: 'WS_CLOSE_REQUESTED', + 1011: 'INTERNAL_ERROR', 4004: 'TOKEN_INVALID', 4010: 'SHARDING_INVALID', 4011: 'SHARDING_REQUIRED', diff --git a/src/util/Options.js b/src/util/Options.js index 9b8e6bff5c2b..b66c7cbc295d 100644 --- a/src/util/Options.js +++ b/src/util/Options.js @@ -33,6 +33,8 @@ const process = require('node:process'); * @property {number|number[]|string} [shards] The shard's id to run, or an array of shard ids. If not specified, * the client will spawn {@link ClientOptions#shardCount} shards. If set to `auto`, it will fetch the * recommended amount of shards from Discord and spawn that amount + * @property {number} [closeTimeout=1] The amount of time in milliseconds to wait for the close frame to be received + * from the WebSocket. Don't have this too high/low. Its best to have it between 2_000-6_000 ms. * @property {number} [shardCount=1] The total amount of shards used by all processes of this bot * (e.g. recommended shard count, shard count of the ShardingManager) * @property {CacheFactory} [makeCache] Function to create a cache. @@ -132,6 +134,7 @@ class Options extends null { */ static createDefault() { return { + closeTimeout: 5_000, waitGuildTimeout: 15_000, shardCount: 1, makeCache: this.cacheWithLimits(this.defaultMakeCacheSettings), diff --git a/typings/index.d.ts b/typings/index.d.ts index bee52aa9c472..0a67b97455a7 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -2791,6 +2791,8 @@ export class WebSocketShard extends EventEmitter { private eventsAttached: boolean; private expectedGuilds: Set | null; private readyTimeout: NodeJS.Timeout | null; + private closeEmitted: boolean; + private wsCloseTimeout: NodeJS.Timeout | null; public manager: WebSocketManager; public id: number; @@ -2806,6 +2808,7 @@ export class WebSocketShard extends EventEmitter { private onPacket(packet: unknown): void; private checkReady(): void; private setHelloTimeout(time?: number): void; + private setWsCloseTimeout(time?: number): void; private setHeartbeatTimer(time: number): void; private sendHeartbeat(): void; private ackHeartbeat(): void; @@ -2815,6 +2818,7 @@ export class WebSocketShard extends EventEmitter { private _send(data: unknown): void; private processQueue(): void; private destroy(destroyOptions?: { closeCode?: number; reset?: boolean; emit?: boolean; log?: boolean }): void; + private emitClose(event?: CloseEvent): void; private _cleanupConnection(): void; private _emitDestroyed(): void; @@ -2966,9 +2970,12 @@ export const Constants: { }; WSCodes: { 1000: 'WS_CLOSE_REQUESTED'; + 1011: 'INTERNAL_ERROR'; 4004: 'TOKEN_INVALID'; 4010: 'SHARDING_INVALID'; 4011: 'SHARDING_REQUIRED'; + 4013: 'INVALID_INTENTS'; + 4014: 'DISALLOWED_INTENTS'; }; Events: ConstantsEvents; ShardEvents: ConstantsShardEvents; @@ -4221,6 +4228,7 @@ export interface ClientFetchInviteOptions { export interface ClientOptions { shards?: number | number[] | 'auto'; shardCount?: number; + closeTimeout?: number; makeCache?: CacheFactory; /** @deprecated Pass the value of this property as `lifetime` to `sweepers.messages` instead. */ messageCacheLifetime?: number;