Skip to content

Commit

Permalink
fix(websocketshard): deal with zombie connection caused by 4009
Browse files Browse the repository at this point in the history
  • Loading branch information
legendhimself committed Mar 1, 2022
1 parent 2fcf8af commit a2326a7
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 16 deletions.
198 changes: 182 additions & 16 deletions src/client/websocket/WebSocketShard.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/* eslint-disable no-console */
'use strict';

const EventEmitter = require('node:events');
const { setTimeout, setInterval } = require('node:timers');
const { setTimeout, setInterval, clearTimeout } = require('node:timers');
const { setTimeout: sleep } = require('node:timers/promises');
const WebSocket = require('../../WebSocket');
const { Status, Events, ShardEvents, Opcodes, WSEvents } = require('../../util/Constants');
const Intents = require('../../util/Intents');
Expand Down Expand Up @@ -81,6 +83,13 @@ class WebSocketShard extends EventEmitter {
*/
this.lastHeartbeatAcked = true;

/**
* Used to prevent from calling the onClose event twice while closing or terminating the WebSocket.
* @type {boolean}
* @private
*/
this.closeEmitted = false;

/**
* Contains the rate limit queue and metadata
* @name WebSocketShard#ratelimit
Expand Down Expand Up @@ -126,6 +135,14 @@ class WebSocketShard extends EventEmitter {
*/
Object.defineProperty(this, 'helloTimeout', { value: null, writable: true });

/**
* The WebSocket timeout.
* @name WebSocketShard#WsCloseTimeout
* @type {?NodeJS.Timeout}
* @private
*/
Object.defineProperty(this, 'WsCloseTimeout', { value: null, writable: true });

/**
* If the manager attached its event handlers on the shard
* @name WebSocketShard#eventsAttached
Expand Down Expand Up @@ -188,6 +205,7 @@ class WebSocketShard extends EventEmitter {
this.removeListener(ShardEvents.RESUMED, onResumed);
this.removeListener(ShardEvents.INVALID_SESSION, onInvalidOrDestroyed);
this.removeListener(ShardEvents.DESTROYED, onInvalidOrDestroyed);
this.removeListener(ShardEvents.ZOMBIE_CONNECTION, this.handleZombieConnection);
};

const onReady = () => {
Expand Down Expand Up @@ -253,7 +271,8 @@ class WebSocketShard extends EventEmitter {

this.connectedAt = Date.now();

const ws = (this.connection = WebSocket.create(gateway, wsQuery));
// Adding a handshake timeout to just make sure no zombie connection appears.
const ws = (this.connection = WebSocket.create(gateway, wsQuery, { handshakeTimeout: 30000 }));
ws.onopen = this.onOpen.bind(this);
ws.onmessage = this.onMessage.bind(this);
ws.onerror = this.onError.bind(this);
Expand Down Expand Up @@ -351,10 +370,25 @@ class WebSocketShard extends EventEmitter {
this.setHeartbeatTimer(-1);
this.setHelloTimeout(-1);
// If we still have a connection object, clean up its listeners
if (this.connection) this._cleanupConnection();
if (this.connection) {
this.debug('[WebSocket] onClose: Cleaning up the Connection.');
this._cleanupConnection();
}
this.closeEmitted = true;
this.status = Status.DISCONNECTED;

// Step 1: Null the connection object
this.debug('Step 1: Null the connection object.');
this.connection = null;

this.debug('Step 2: Set the shard status to DISCONNECTED');
// Step 2: Set the shard status to DISCONNECTED
this.status = Status.DISCONNECTED;

this.debug('Step 3: Cache the old sequence (use to attempt a resume)');
// Step 3: Cache the old sequence (use to attempt a resume)
if (this.sequence !== -1) this.closeSequence = this.sequence;

/**
* Emitted when a shard's WebSocket closes.
* @private
Expand Down Expand Up @@ -519,10 +553,51 @@ class WebSocketShard extends EventEmitter {
this.debug('Setting a HELLO timeout for 20s.');
this.helloTimeout = setTimeout(() => {
this.debug('Did not receive HELLO in time. Destroying and connecting again.');
this.destroy({ reset: true, closeCode: 4009 });
this.destroy({ closeCode: 4009 });
}, 20_000).unref();
}

/**
* Sets the WebSocket Close timeout.
* This method is responsilble to detect any zombie connections if the ws fails to close properly,
* Wait for 6s for the ws#close event after ws.close() or ws.terminate() is called.
* @param {number} [time] If set to -1, it will clear the timeout
* @private
*/
setWsCloseTimeout(time) {
if (time === -1) {
if (this.WsCloseTimeout) {
this.debug('Clearing the WebSocket Close timeout.');
clearTimeout(this.WsCloseTimeout);
this.WsCloseTimeout = null;
}
return;
}
this.WsCloseTimeout = setTimeout(() => {
this.debug(`[WebSocket] Close Emitted: ${this.closeEmitted}`);
// Check connection is null or if close event was emitted.
if (!this.connection || this.closeEmitted) {
this.debug(
`[WebSocket] WebSocket close not detected. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
} | Close Emitted: ${this.closeEmitted}`,
);
this.closeEmitted = false;
this.setWsCloseTimeout(-1);
return;
}
console.log('Should emit zombieConnection');
// Waiting for approx 5s.

/**
* Emitted when a shard's WebSocket is not closed in time.
* @private
* @event WebSocketShard#zombieConnection
*/
this.emit(ShardEvents.ZOMBIE_CONNECTION);
}, 6000).unref();
}

/**
* Sets the heartbeat timer for this shard.
* @param {number} time If -1, clears the interval, any other number sets an interval
Expand Down Expand Up @@ -563,8 +638,7 @@ class WebSocketShard extends EventEmitter {
Sequence : ${this.sequence}
Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`,
);

this.destroy({ closeCode: 4009, reset: true });
this.destroy({ closeCode: 4009 });
return;
}

Expand Down Expand Up @@ -702,31 +776,49 @@ class WebSocketShard extends EventEmitter {
* @private
*/
destroy({ closeCode = 1_000, reset = false, emit = true, log = true } = {}) {
// Making the variable false to check for zombie connections.
this.closeEmitted = false;

if (log) {
this.debug(`[DESTROY]
Close Code : ${closeCode}
Reset : ${reset}
Emit DESTROYED: ${emit}`);
}

this.debug(`[WS Destroy] Step 0: Remove all timers.`);
// Step 0: Remove all timers
this.setHeartbeatTimer(-1);
this.setHelloTimeout(-1);

this.debug(
`[WS Destroy] Step 1: Attempting to close the WebSocket. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);
// Step 1: Close the WebSocket connection, if any, otherwise, emit DESTROYED
if (this.connection) {
// If the connection is currently opened, we will (hopefully) receive close
if (this.connection.readyState === WebSocket.OPEN) {
this.connection.close(closeCode);
this.debug(
`[WebSocket] Close: Tried closing. | WS State: ${CONNECTION_STATE[this.connection?.readyState ?? 3]}`,
);
} else {
// Connection is not OPEN
this.debug(`WS State: ${CONNECTION_STATE[this.connection.readyState]}`);
this.debug(`WS State: ${CONNECTION_STATE[this.connection?.readyState ?? 3]}`);
// Remove listeners from the connection
this._cleanupConnection();
// Attempt to close the connection just in case
try {
this.connection.close(closeCode);
} catch {
this.debug(
`[WebSocket] Close: Something went wrong while closing the WebSocket, Now calling terminate. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);
this.connection.terminate();
// No-op
}
// Emit the destroyed event if needed
Expand All @@ -737,30 +829,104 @@ class WebSocketShard extends EventEmitter {
this._emitDestroyed();
}

// Step 2: Null the connection object
this.connection = null;

// Step 3: Set the shard status to DISCONNECTED
this.status = Status.DISCONNECTED;
/**
* Just to make sure the readyState is not stuck at CLOSING incase of a closeCode 4009.
* we can use this for other closeCodes aswell but rightnow so far only 4009 is failing to close.
**/
if (closeCode === 4009) {
this.debug(
`[WebSocket] closeCode[4009]: Adding a timeout to check the connection state. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);
this.setWsCloseTimeout();
}

// Step 4: Cache the old sequence (use to attempt a resume)
if (this.sequence !== -1) this.closeSequence = this.sequence;
this.debug(
`[WS Destroy] Step 2: Resetting the sequence and session id if requested. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);

// Step 5: Reset the sequence and session id if requested
// Step 2: Reset the sequence and session id if requested
if (reset) {
this.sequence = -1;
this.sessionId = null;
}

// Step 6: reset the rate limit data
// Step 3: Watch for zombie connection event which could be emitted when step 1 fails to close the WebSocket
this.on(ShardEvents.ZOMBIE_CONNECTION, this.handleZombieConnection);

this.debug('Step 4: reset the rate limit data');
// Step 4: reset the rate limit data
this.ratelimit.remaining = this.ratelimit.total;
this.ratelimit.queue.length = 0;
if (this.ratelimit.timer) {
this.debug(`[WS Destroy] Step 4: Clearing ratelimit timer`);
clearTimeout(this.ratelimit.timer);
this.ratelimit.timer = null;
}
}

/**
* Forcefully closes the WebSocket when the destory() method could not close it normally
* using the ws.close(); and ws.terminate();
* @private
* @returns {Promise<void>}
*/
async handleZombieConnection() {
// Continuing from step 2 from the destroy method.

// Step 3 Check if the connection was closed and readyState is at CLOSED. If not close it.
if (this.connection?.readyState === WebSocket.CLOSING || this.connection?.readyState === WebSocket.OPEN) {
this.debug(
`[WS Destroy] Step 3: WebSocket Still Connected: trying to terminate. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);

// Trying to close WebSocket with terminate..
try {
this.connection.terminate();
} catch {
// No op.
}

// Wait for 400ms for ws to close.
await sleep(400);
this.debug(`[WebSocket] Final Close Check. | WS State: ${CONNECTION_STATE[this.connection?.readyState]}`);
if (this.connection?.readyState === WebSocket.CLOSING || this.connection?.readyState === WebSocket.OPEN) {
this.debug(
// eslint-disable-next-line max-len
`[WebSocket] Connection still stuck at Closing: Calling a Manual Destroy of the socket to Close and reconnect. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);

/**
* This is an important step to deal with zombie connections where in shard never reconnects
* after a 4009 closeCode due to WebSocket being stuck at CLOSING ready state.
* Check the issue https://github.com/discordjs/discord.js/issues/7450
*
* The _socket.destroy() method Ensures that no more I/O activity happens on this socket.
* Destroys the stream and closes the connection. Refer: https://nodejs.org/api/net.html#socketdestroy
* _socket.destroy() is also being invoked in ws.terminate() method.
* ._socket.destroy emits the close event and makes the readyState to CLOSED.
*/

// manual destory
this.connection._socket.destroy();
this.connection.emitClose();

this.debug(
`[WebSocket] Applied a Manual Destroy, Clearing the WS State Logger in 5s. | WS State: ${
CONNECTION_STATE[this.connection?.readyState ?? 3]
}`,
);
}
}
}

/**
* Cleans up the WebSocket connection listeners.
* @private
Expand Down
1 change: 1 addition & 0 deletions src/util/Constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ exports.ShardEvents = {
READY: 'ready',
RESUMED: 'resumed',
ALL_READY: 'allReady',
ZOMBIE_CONNECTION: 'zombieConnection',
};

/**
Expand Down
5 changes: 5 additions & 0 deletions typings/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2608,6 +2608,7 @@ export interface WebSocketShardEvents {
invalidSession: [];
close: [event: CloseEvent];
allReady: [unavailableGuilds?: Set<Snowflake>];
zombieConnection: [];
}

export class WebSocketShard extends EventEmitter {
Expand All @@ -2623,6 +2624,8 @@ export class WebSocketShard extends EventEmitter {
private eventsAttached: boolean;
private expectedGuilds: Set<Snowflake> | null;
private readyTimeout: NodeJS.Timeout | null;
private closeEmitted: boolean;
private WsCloseTimeout: NodeJS.Timeout | null;

public manager: WebSocketManager;
public id: number;
Expand All @@ -2638,6 +2641,7 @@ export class WebSocketShard extends EventEmitter {
private onPacket(packet: unknown): void;
private checkReady(): void;
private setHelloTimeout(time?: number): void;
private setWsCloseTimeout(time?: number): void;
private setHeartbeatTimer(time: number): void;
private sendHeartbeat(): void;
private ackHeartbeat(): void;
Expand All @@ -2647,6 +2651,7 @@ export class WebSocketShard extends EventEmitter {
private _send(data: unknown): void;
private processQueue(): void;
private destroy(destroyOptions?: { closeCode?: number; reset?: boolean; emit?: boolean; log?: boolean }): void;
private handleZombieConnection(): Promise<void>;
private _cleanupConnection(): void;
private _emitDestroyed(): void;

Expand Down

0 comments on commit a2326a7

Please sign in to comment.