Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove zlib-sync #9279

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/ws/src/utils/constants.ts
Expand Up @@ -40,7 +40,8 @@ export const DefaultWebSocketManagerOptions = {
},
version: APIVersion,
encoding: Encoding.JSON,
compression: null,
transportCompression: null,
useIdentifyCompression: false,
retrieveSessionInfo(shardId) {
const store = getDefaultSessionStore();
return store.get(shardId) ?? null;
Expand Down
18 changes: 12 additions & 6 deletions packages/ws/src/ws/WebSocketManager.ts
Expand Up @@ -84,12 +84,6 @@ export interface OptionalWebSocketManagerOptions {
* ```
*/
buildStrategy(manager: WebSocketManager): IShardingStrategy;
/**
* The compression method to use
*
* @defaultValue `null` (no compression)
*/
compression: CompressionMethod | null;
/**
* The encoding to use
*
Expand Down Expand Up @@ -161,10 +155,22 @@ export interface OptionalWebSocketManagerOptions {
* ```
*/
shardIds: number[] | ShardRange | null;
/**
* The transport compression method to use - mutually exclusive with `useIdentifyCompression`
*
* @defaultValue `null` (no transport compression)
*/
transportCompression: CompressionMethod | null;
/**
* Function used to store session information for a given shard
*/
updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable<void>;
/**
* Whether to use the `compress` option when identifying - mutually exclusive with `transportCompression`
*
* @defaultValue `false`
*/
useIdentifyCompression: boolean;
/**
* The gateway version to use
*
Expand Down
91 changes: 44 additions & 47 deletions packages/ws/src/ws/WebSocketShard.ts
@@ -1,13 +1,12 @@
/* eslint-disable id-length */
import { Buffer } from 'node:buffer';
import type { Buffer } from 'node:buffer';
import { once } from 'node:events';
import { setTimeout, clearInterval, clearTimeout, setInterval } from 'node:timers';
import { setTimeout as sleep } from 'node:timers/promises';
import { URLSearchParams } from 'node:url';
import { TextDecoder } from 'node:util';
import { inflate } from 'node:zlib';
import type { Inflate } from 'node:zlib';
import { inflate, createInflate, constants as zlibConstants } from 'node:zlib';
import { Collection } from '@discordjs/collection';
import { lazy } from '@discordjs/util';
import { AsyncQueue } from '@sapphire/async-queue';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
import {
Expand All @@ -21,14 +20,10 @@ import {
type GatewayReadyDispatchData,
} from 'discord-api-types/v10';
import { WebSocket, type RawData } from 'ws';
import type { Inflate } from 'zlib-sync';
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy';
import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js';
import type { SessionInfo } from './WebSocketManager.js';

// eslint-disable-next-line promise/prefer-await-to-then
const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null));

export enum WebSocketShardEvents {
Closed = 'closed',
Debug = 'debug',
Expand Down Expand Up @@ -83,7 +78,7 @@ export interface SendRateLimitState {
export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private connection: WebSocket | null = null;

private useIdentifyCompress = false;
private useIdentifyCompression = false;

private inflate: Inflate | null = null;

Expand Down Expand Up @@ -131,22 +126,28 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
throw new Error("Tried to connect a shard that wasn't idle");
}

const { version, encoding, compression } = this.strategy.options;
const { version, encoding, transportCompression: compression, useIdentifyCompression } = this.strategy.options;
this.useIdentifyCompression = useIdentifyCompression;

// eslint-disable-next-line id-length
const params = new URLSearchParams({ v: version, encoding });
if (compression) {
const zlib = await getZlibSync();
if (zlib) {
params.append('compress', compression);
this.inflate = new zlib.Inflate({
chunkSize: 65_535,
to: 'string',
});
} else if (!this.useIdentifyCompress) {
this.useIdentifyCompress = true;
console.warn(
'WebSocketShard: Compression is enabled but zlib-sync is not installed, falling back to identify compress',
);
if (useIdentifyCompression) {
console.warn('WebSocketShard: transport compression is enabled, disabling identify compression');
this.useIdentifyCompression = false;
}

params.append('compress', compression);
const inflate = createInflate({
chunkSize: 65_535,
flush: zlibConstants.Z_SYNC_FLUSH,
});

inflate.on('error', (error) => {
this.emit(WebSocketShardEvents.Error, { error });
});

this.inflate = inflate;
}

const session = this.session ?? (await this.strategy.retrieveSessionInfo(this.id));
Expand Down Expand Up @@ -369,28 +370,29 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
`shard id: ${this.id.toString()}`,
`shard count: ${this.strategy.options.shardCount}`,
`intents: ${this.strategy.options.intents}`,
`compression: ${this.inflate ? 'zlib-stream' : this.useIdentifyCompress ? 'identify' : 'none'}`,
`compression: ${this.inflate ? 'zlib-stream' : this.useIdentifyCompression ? 'identify' : 'none'}`,
]);

const d: GatewayIdentifyData = {
const data: GatewayIdentifyData = {
token: this.strategy.options.token,
properties: this.strategy.options.identifyProperties,
intents: this.strategy.options.intents,
compress: this.useIdentifyCompress,
compress: this.useIdentifyCompression,
shard: [this.id, this.strategy.options.shardCount],
};

if (this.strategy.options.largeThreshold) {
d.large_threshold = this.strategy.options.largeThreshold;
data.large_threshold = this.strategy.options.largeThreshold;
}

if (this.strategy.options.initialPresence) {
d.presence = this.strategy.options.initialPresence;
data.presence = this.strategy.options.initialPresence;
}

await this.send({
op: GatewayOpcodes.Identify,
d,
// eslint-disable-next-line id-length
d: data,
});

await this.bubbleWaitForEventError(
Expand All @@ -404,6 +406,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.replayedEvents = 0;
return this.send({
op: GatewayOpcodes.Resume,
// eslint-disable-next-line id-length
d: {
token: this.strategy.options.token,
seq: session.sequence,
Expand All @@ -419,6 +422,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

await this.send({
op: GatewayOpcodes.Heartbeat,
// eslint-disable-next-line id-length
d: this.session?.sequence ?? null,
});

Expand All @@ -435,7 +439,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

// Deal with identify compress
if (this.useIdentifyCompress) {
if (this.useIdentifyCompression) {
return new Promise((resolve, reject) => {
inflate(decompressable, { chunkSize: 65_535 }, (err, result) => {
if (err) {
Expand All @@ -450,28 +454,21 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

// Deal with gw wide zlib-stream compression
if (this.inflate) {
const l = decompressable.length;
const flush =
l >= 4 &&
decompressable[l - 4] === 0x00 &&
decompressable[l - 3] === 0x00 &&
decompressable[l - 2] === 0xff &&
decompressable[l - 1] === 0xff;

const zlib = (await getZlibSync())!;
this.inflate.push(Buffer.from(decompressable), flush ? zlib.Z_SYNC_FLUSH : zlib.Z_NO_FLUSH);

if (this.inflate.err) {
this.emit(WebSocketShardEvents.Error, {
error: new Error(`${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`),
});
}
decompressable.length &&
decompressable.at(-4) === 0x00 &&
didinele marked this conversation as resolved.
Show resolved Hide resolved
decompressable.at(-3) === 0x00 &&
decompressable.at(-2) === 0xff &&
decompressable.at(-1) === 0xff;

this.inflate!.write(decompressable, 'binary');

if (!flush) {
return null;
}

const { result } = this.inflate;
const [result] = await once(this.inflate, 'data');

if (!result) {
return null;
}
Expand All @@ -482,7 +479,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.debug([
'Received a message we were unable to decompress',
`isBinary: ${isBinary.toString()}`,
`useIdentifyCompress: ${this.useIdentifyCompress.toString()}`,
`useIdentifyCompress: ${this.useIdentifyCompression.toString()}`,
`inflate: ${Boolean(this.inflate).toString()}`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`useIdentifyCompress: ${this.useIdentifyCompression.toString()}`,
`useIdentifyCompression: ${this.useIdentifyCompression.toString()}`,

]);

Expand Down Expand Up @@ -707,7 +704,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
messages.length > 1
? `\n${messages
.slice(1)
.map((m) => ` ${m}`)
.map((message) => ` ${message}`)
.join('\n')}`
: ''
}`;
Expand Down