Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: socketio/socket.io
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 4.3.2
Choose a base ref
...
head repository: socketio/socket.io
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4.4.0
Choose a head ref
  • 8 commits
  • 21 files changed
  • 2 contributors

Commits on Nov 8, 2021

  1. feat: add type information to socket.data (#4159)

    Usage:
    
    ```js
    interface SocketData {
      name: string;
      age: number;
    }
    
    const io = new Server<ClientToServerEvents, ServerToClientEvents, InterServerEvents, SocketData>();
    
    io.on("connection", (socket) => {
      socket.data.name = "john";
      socket.data.age = 42;
    });
    ```
    backmeupplz authored Nov 8, 2021

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    fe8730c View commit details

Commits on Nov 12, 2021

  1. feat: add an implementation based on uWebSockets.js

    Usage:
    
    ```js
    const { App } = require("uWebSockets.js");
    const { Server } = require("socket.io");
    
    const app = new App();
    const server = new Server();
    
    server.attachApp(app);
    
    app.listen(3000);
    ```
    
    The Adapter prototype is updated so we can benefit from the publish
    functionality of uWebSockets.js, so this will apply to all adapters
    extending the default adapter.
    
    Reference: https://github.com/uNetworking/uWebSockets.js
    
    Related:
    
    - #3601
    - socketio/engine.io#578
    darrachequesne committed Nov 12, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    c0d8c5a View commit details
  2. fix: only set 'connected' to true after middleware execution

    The Socket instance is only considered connected when the "connection"
    event is emitted, and not during the middleware(s) execution.
    
    ```js
    io.use((socket, next) => {
      console.log(socket.connected); // prints "false"
      next();
    });
    
    io.on("connection", (socket) => {
      console.log(socket.connected); // prints "true"
    });
    ```
    
    Related: #4129
    darrachequesne committed Nov 12, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    02b0f73 View commit details

Commits on Nov 16, 2021

  1. Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    2da8210 View commit details
  2. test: fix flaky test

    `srv.close()` only closes the underlying HTTP server, but this does not
    terminate the existing WebSocket connections.
    
    Reference: https://nodejs.org/api/http.html#serverclosecallback
    darrachequesne committed Nov 16, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    b7213e7 View commit details
  3. feat: add timeout feature

    Usage:
    
    ```js
    socket.timeout(5000).emit("my-event", (err) => {
      if (err) {
        // the client did not acknowledge the event in the given delay
      }
    });
    ```
    darrachequesne committed Nov 16, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    f0ed42f View commit details

Commits on Nov 18, 2021

  1. fix: prevent double ack when emitting with a timeout

    The ack was not properly removed upon timeout, and could be called
    twice.
    
    Related: f0ed42f
    darrachequesne committed Nov 18, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    b839a3b View commit details
  2. chore(release): 4.4.0

    darrachequesne committed Nov 18, 2021

    Verified

    This commit was signed with the committer’s verified signature. The key has expired.
    darrachequesne Damien Arrachequesne
    Copy the full SHA
    0f11c47 View commit details
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# [4.4.0](https://github.com/socketio/socket.io/compare/4.3.2...4.4.0) (2021-11-18)


### Bug Fixes

* only set 'connected' to true after middleware execution ([02b0f73](https://github.com/socketio/socket.io/commit/02b0f73e2c64b09c72c5fbf7dc5f059557bdbe50))


### Features

* add an implementation based on uWebSockets.js ([c0d8c5a](https://github.com/socketio/socket.io/commit/c0d8c5ab234d0d2bef0d0dec472973cc9662f647))
* add timeout feature ([f0ed42f](https://github.com/socketio/socket.io/commit/f0ed42f18cabef20ad976aeec37077b6bf3837a5))
* add type information to `socket.data` ([#4159](https://github.com/socketio/socket.io/issues/4159)) ([fe8730c](https://github.com/socketio/socket.io/commit/fe8730ca0f15bc92d5de81cf934c89c76d6af329))



## [4.3.2](https://github.com/socketio/socket.io/compare/4.3.1...4.3.2) (2021-11-08)


4 changes: 2 additions & 2 deletions client-dist/socket.io.esm.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client-dist/socket.io.esm.min.js.map

Large diffs are not rendered by default.

97 changes: 78 additions & 19 deletions client-dist/socket.io.js
2 changes: 1 addition & 1 deletion client-dist/socket.io.js.map

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions client-dist/socket.io.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client-dist/socket.io.min.js.map

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions client-dist/socket.io.msgpack.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion client-dist/socket.io.msgpack.min.js.map

Large diffs are not rendered by default.

22 changes: 15 additions & 7 deletions lib/client.ts
Original file line number Diff line number Diff line change
@@ -21,21 +21,27 @@ interface WriteOptions {
export class Client<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
ServerSideEvents extends EventsMap,
SocketData = any
> {
public readonly conn: RawSocket;

private readonly id: string;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;
private readonly encoder: Encoder;
private readonly decoder: Decoder;
private sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();
private nsps: Map<
string,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();
private connectTimeout?: NodeJS.Timeout;

@@ -47,7 +53,7 @@ export class Client<
* @package
*/
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
conn: any
) {
this.server = server;
@@ -112,7 +118,7 @@ export class Client<
auth,
(
dynamicNspName:
| Namespace<ListenEvents, EmitEvents, ServerSideEvents>
| Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
| false
) => {
if (dynamicNspName) {
@@ -171,7 +177,9 @@ export class Client<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
_remove(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
): void {
if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id)!.nsp.name;
this.sockets.delete(socket.id);
87 changes: 82 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import {
Server as Engine,
ServerOptions as EngineOptions,
AttachOptions,
uServer,
} from "engine.io";
import { Client } from "./client";
import { EventEmitter } from "events";
@@ -27,6 +28,7 @@ import {
StrictEventEmitter,
EventNames,
} from "./typed-events";
import { patchAdapter, restoreAdapter, serveFile } from "./uws.js";

const debug = debugModule("socket.io:server");

@@ -72,16 +74,23 @@ interface ServerOptions extends EngineOptions, AttachOptions {
export class Server<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ServerSideEvents,
EmitEvents,
ServerReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
ServerReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>
> {
public readonly sockets: Namespace<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
>;
/**
* A reference to the underlying Engine.IO server.
@@ -337,6 +346,69 @@ export class Server<
return this;
}

public attachApp(app /*: TemplatedApp */, opts: Partial<ServerOptions> = {}) {
// merge the options passed to the Socket.IO server
Object.assign(opts, this.opts);
// set engine.io path to `/socket.io`
opts.path = opts.path || this._path;

// initialize engine
debug("creating uWebSockets.js-based engine with opts %j", opts);
const engine = new uServer(opts);

engine.attach(app, opts);

// bind to engine events
this.bind(engine);

if (this._serveClient) {
// attach static file serving
app.get(`${this._path}/*`, (res, req) => {
if (!this.clientPathRegex.test(req.getUrl())) {
req.setYield(true);
return;
}

const filename = req
.getUrl()
.replace(this._path, "")
.replace(/\?.*$/, "")
.replace(/^\//, "");
const isMap = dotMapRegex.test(filename);
const type = isMap ? "map" : "source";

// Per the standard, ETags must be quoted:
// https://tools.ietf.org/html/rfc7232#section-2.3
const expectedEtag = '"' + clientVersion + '"';
const weakEtag = "W/" + expectedEtag;

const etag = req.getHeader("if-none-match");
if (etag) {
if (expectedEtag === etag || weakEtag === etag) {
debug("serve client %s 304", type);
res.writeStatus("304 Not Modified");
res.end();
return;
}
}

debug("serve client %s", type);

res.writeHeader("cache-control", "public, max-age=0");
res.writeHeader(
"content-type",
"application/" + (isMap ? "json" : "javascript")
);
res.writeHeader("etag", expectedEtag);

const filepath = path.join(__dirname, "../client-dist/", filename);
serveFile(res, filepath);
});
}

patchAdapter(app);
}

/**
* Initialize engine
*
@@ -504,7 +576,9 @@ export class Server<
*/
public of(
name: string | RegExp | ParentNspNameMatchFn,
fn?: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void
fn?: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
if (typeof name === "function" || name instanceof RegExp) {
const parentNsp = new ParentNamespace(this);
@@ -553,6 +627,9 @@ export class Server<

this.engine.close();

// restore the Adapter prototype
restoreAdapter();

if (this.httpServer) {
this.httpServer.close(fn);
} else {
@@ -568,7 +645,7 @@ export class Server<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
): this {
52 changes: 35 additions & 17 deletions lib/namespace.ts
Original file line number Diff line number Diff line change
@@ -21,56 +21,72 @@ export interface ExtendedError extends Error {
export interface NamespaceReservedEventsMap<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
ServerSideEvents extends EventsMap,
SocketData
> {
connect: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void;
connect: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
connection: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
}

export interface ServerReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
> extends NamespaceReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents
ServerSideEvents,
SocketData
> {
new_namespace: (
namespace: Namespace<ListenEvents, EmitEvents, ServerSideEvents>
namespace: Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) => void;
}

export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
keyof ServerReservedEventsMap<never, never, never>
keyof ServerReservedEventsMap<never, never, never, never>
>(<const>["connect", "connection", "new_namespace"]);

export class Namespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
NamespaceReservedEventsMap<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>
> {
public readonly name: string;
public readonly sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Map();

public adapter: Adapter;

/** @private */
readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;

/** @private */
_fns: Array<
(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
> = [];
@@ -85,7 +101,7 @@ export class Namespace<
* @param name
*/
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
name: string
) {
super();
@@ -114,7 +130,7 @@ export class Namespace<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -130,7 +146,7 @@ export class Namespace<
* @private
*/
private run(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>,
fn: (err: ExtendedError | null) => void
) {
const fns = this._fns.slice(0);
@@ -195,7 +211,7 @@ export class Namespace<
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
query,
fn?: () => void
): Socket<ListenEvents, EmitEvents, ServerSideEvents> {
): Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
debug("adding socket to nsp %s", this.name);
const socket = new Socket(this, client, query);
this.run(socket, (err) => {
@@ -238,7 +254,9 @@ export class Namespace<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
_remove(
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
): void {
if (this.sockets.has(socket.id)) {
this.sockets.delete(socket.id);
} else {
16 changes: 10 additions & 6 deletions lib/parent-namespace.ts
Original file line number Diff line number Diff line change
@@ -11,13 +11,17 @@ import type { BroadcastOptions } from "socket.io-adapter";
export class ParentNamespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
private static count: number = 0;
private children: Set<Namespace<ListenEvents, EmitEvents, ServerSideEvents>> =
new Set();
private children: Set<
Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
> = new Set();

constructor(server: Server<ListenEvents, EmitEvents, ServerSideEvents>) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents, SocketData>
) {
super(server, "/_" + ParentNamespace.count++);
}

@@ -47,7 +51,7 @@ export class ParentNamespace<

createChild(
name: string
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
): Namespace<ListenEvents, EmitEvents, ServerSideEvents, SocketData> {
const namespace = new Namespace(this.server, name);
namespace._fns = this._fns.slice(0);
this.listeners("connect").forEach((listener) =>
84 changes: 69 additions & 15 deletions lib/socket.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Packet, PacketType } from "socket.io-parser";
import url = require("url");
import debugModule from "debug";
import type { Server } from "./index";
import {
@@ -46,7 +45,7 @@ export interface EventEmitterReservedEventsMap {

export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
| ClientReservedEvents
| keyof NamespaceReservedEventsMap<never, never, never>
| keyof NamespaceReservedEventsMap<never, never, never, never>
| keyof SocketReservedEventsMap
| keyof EventEmitterReservedEventsMap
>(<const>[
@@ -113,7 +112,8 @@ type Event = [eventName: string, ...args: any[]];
export class Socket<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = DefaultEventsMap
ServerSideEvents extends EventsMap = DefaultEventsMap,
SocketData = any
> extends StrictEventEmitter<
ListenEvents,
EmitEvents,
@@ -124,16 +124,20 @@ export class Socket<
/**
* Additional information that can be attached to the Socket instance and which will be used in the fetchSockets method
*/
public data: any = {};
public data: Partial<SocketData> = {};

public connected: boolean;
public disconnected: boolean;
public connected: boolean = false;

private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly server: Server<
ListenEvents,
EmitEvents,
ServerSideEvents,
SocketData
>;
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
private flags: BroadcastFlags = {};
private flags: BroadcastFlags & { timeout?: number } = {};
private _anyListeners?: Array<(...args: any[]) => void>;

/**
@@ -158,8 +162,6 @@ export class Socket<
} else {
this.id = base64id.generateId(); // don't reuse the Engine.IO id because it's sensitive information
}
this.connected = true;
this.disconnected = false;
this.handshake = this.buildHandshake(auth);
}

@@ -178,7 +180,8 @@ export class Socket<
secure: !!this.request.connection.encrypted,
issued: +new Date(),
url: this.request.url!,
query: url.parse(this.request.url!, true).query,
// @ts-ignore
query: this.request._query,
auth,
};
}
@@ -204,9 +207,11 @@ export class Socket<

// access last argument to see if it's an ACK callback
if (typeof data[data.length - 1] === "function") {
debug("emitting packet with ack id %d", this.nsp._ids);
this.acks.set(this.nsp._ids, data.pop());
packet.id = this.nsp._ids++;
const id = this.nsp._ids++;
debug("emitting packet with ack id %d", id);

this.registerAckCallback(id, data.pop());
packet.id = id;
}

const flags = Object.assign({}, this.flags);
@@ -217,6 +222,28 @@ export class Socket<
return true;
}

/**
* @private
*/
private registerAckCallback(id: number, ack: (...args: any[]) => void): void {
const timeout = this.flags.timeout;
if (timeout === undefined) {
this.acks.set(id, ack);
return;
}

const timer = setTimeout(() => {
debug("event with ack id %d has timed out after %d ms", id, timeout);
this.acks.delete(id);
ack.call(this, new Error("operation has timed out"));
}, timeout);

this.acks.set(id, (...args) => {
clearTimeout(timer);
ack.apply(this, [null, ...args]);
});
}

/**
* Targets a room when broadcasting.
*
@@ -336,6 +363,7 @@ export class Socket<
*/
_onconnect(): void {
debug("socket connected - writing packet");
this.connected = true;
this.join(this.id);
if (this.conn.protocol === 3) {
this.packet({ type: PacketType.CONNECT });
@@ -483,7 +511,6 @@ export class Socket<
this.nsp._remove(this);
this.client._remove(this);
this.connected = false;
this.disconnected = true;
this.emitReserved("disconnect", reason);
return;
}
@@ -564,6 +591,26 @@ export class Socket<
return this.newBroadcastOperator().local;
}

/**
* Sets a modifier for a subsequent event emission that the callback will be called with an error when the
* given number of milliseconds have elapsed without an acknowledgement from the client:
*
* ```
* socket.timeout(5000).emit("my-event", (err) => {
* if (err) {
* // the client did not acknowledge the event in the given delay
* }
* });
* ```
*
* @returns self
* @public
*/
public timeout(timeout: number): this {
this.flags.timeout = timeout;
return this;
}

/**
* Dispatch incoming event to socket listeners.
*
@@ -625,6 +672,13 @@ export class Socket<
run(0);
}

/**
* Whether the socket is currently disconnected
*/
public get disconnected() {
return !this.connected;
}

/**
* A reference to the request that originated the underlying Engine.IO Socket.
*
162 changes: 162 additions & 0 deletions lib/uws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { Adapter, Room } from "socket.io-adapter";
import type { WebSocket } from "uWebSockets.js";
import type { Socket } from "./socket.js";
import { createReadStream, statSync } from "fs";
import debugModule from "debug";

const debug = debugModule("socket.io:adapter-uws");

const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text

const { addAll, del, broadcast } = Adapter.prototype;

export function patchAdapter(app /* : TemplatedApp */) {
Adapter.prototype.addAll = function (id, rooms) {
const isNew = !this.sids.has(id);
addAll.call(this, id, rooms);
const socket: Socket = this.nsp.sockets.get(id);
if (!socket) {
return;
}
if (socket.conn.transport.name === "websocket") {
subscribe(this.nsp.name, socket, isNew, rooms);
return;
}
if (isNew) {
socket.conn.on("upgrade", () => {
const rooms = this.sids.get(id);
subscribe(this.nsp.name, socket, isNew, rooms);
});
}
};

Adapter.prototype.del = function (id, room) {
del.call(this, id, room);
const socket: Socket = this.nsp.sockets.get(id);
if (socket && socket.conn.transport.name === "websocket") {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket: WebSocket = socket.conn.transport.socket;
const topic = `${this.nsp.name}${SEPARATOR}${room}`;
debug("unsubscribe connection %s from topic %s", sessionId, topic);
websocket.unsubscribe(topic);
}
};

Adapter.prototype.broadcast = function (packet, opts) {
const useFastPublish = opts.rooms.size <= 1 && opts.except!.size === 0;
if (!useFastPublish) {
broadcast.call(this, packet, opts);
return;
}

const flags = opts.flags || {};
const basePacketOpts = {
preEncoded: true,
volatile: flags.volatile,
compress: flags.compress,
};

packet.nsp = this.nsp.name;
const encodedPackets = this.encoder.encode(packet);

const topic =
opts.rooms.size === 0
? this.nsp.name
: `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`;
debug("fast publish to %s", topic);

// fast publish for clients connected with WebSocket
encodedPackets.forEach((encodedPacket) => {
const isBinary = typeof encodedPacket !== "string";
// "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol
app.publish(
topic,
isBinary ? encodedPacket : "4" + encodedPacket,
isBinary
);
});

this.apply(opts, (socket) => {
if (socket.conn.transport.name !== "websocket") {
// classic publish for clients connected with HTTP long-polling
socket.client.writeToEngine(encodedPackets, basePacketOpts);
}
});
};
}

function subscribe(
namespaceName: string,
socket: Socket,
isNew: boolean,
rooms: Set<Room>
) {
// @ts-ignore
const sessionId = socket.conn.id;
// @ts-ignore
const websocket: WebSocket = socket.conn.transport.socket;
if (isNew) {
debug("subscribe connection %s to topic %s", sessionId, namespaceName);
websocket.subscribe(namespaceName);
}
rooms.forEach((room) => {
const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard
debug("subscribe connection %s to topic %s", sessionId, topic);
websocket.subscribe(topic);
});
}

export function restoreAdapter() {
Adapter.prototype.addAll = addAll;
Adapter.prototype.del = del;
Adapter.prototype.broadcast = broadcast;
}

const toArrayBuffer = (buffer: Buffer) => {
const { buffer: arrayBuffer, byteOffset, byteLength } = buffer;
return arrayBuffer.slice(byteOffset, byteOffset + byteLength);
};

// imported from https://github.com/kolodziejczak-sz/uwebsocket-serve
export function serveFile(res /* : HttpResponse */, filepath: string) {
const { size } = statSync(filepath);
const readStream = createReadStream(filepath);
const destroyReadStream = () => !readStream.destroyed && readStream.destroy();

const onError = (error: Error) => {
destroyReadStream();
throw error;
};

const onDataChunk = (chunk: Buffer) => {
const arrayBufferChunk = toArrayBuffer(chunk);

const lastOffset = res.getWriteOffset();
const [ok, done] = res.tryEnd(arrayBufferChunk, size);

if (!done && !ok) {
readStream.pause();

res.onWritable((offset) => {
const [ok, done] = res.tryEnd(
arrayBufferChunk.slice(offset - lastOffset),
size
);

if (!done && ok) {
readStream.resume();
}

return ok;
});
}
};

res.onAborted(destroyReadStream);
readStream
.on("data", onDataChunk)
.on("error", onError)
.on("end", destroyReadStream);
}
56 changes: 23 additions & 33 deletions package-lock.json
11 changes: 6 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "socket.io",
"version": "4.3.2",
"version": "4.4.0",
"description": "node.js realtime framework server",
"keywords": [
"realtime",
@@ -48,8 +48,8 @@
"accepts": "~1.3.4",
"base64id": "~2.0.0",
"debug": "~4.3.2",
"engine.io": "~6.0.0",
"socket.io-adapter": "~2.3.2",
"engine.io": "~6.1.0",
"socket.io-adapter": "~2.3.3",
"socket.io-parser": "~4.0.4"
},
"devDependencies": {
@@ -59,13 +59,14 @@
"nyc": "^15.1.0",
"prettier": "^2.3.2",
"rimraf": "^3.0.2",
"socket.io-client": "4.3.2",
"socket.io-client": "4.4.0",
"socket.io-client-v2": "npm:socket.io-client@^2.4.0",
"superagent": "^6.1.0",
"supertest": "^6.1.6",
"ts-node": "^10.2.1",
"tsd": "^0.17.0",
"typescript": "^4.4.2"
"typescript": "^4.4.2",
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.0.0"
},
"contributors": [
{
57 changes: 57 additions & 0 deletions test/socket-timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Server } from "..";
import { createClient, success } from "./support/util";
import expect from "expect.js";

describe("timeout", () => {
it("should timeout if the client does not acknowledge the event", (done) => {
const io = new Server(0);
const client = createClient(io, "/");

io.on("connection", (socket) => {
socket.timeout(50).emit("unknown", (err) => {
expect(err).to.be.an(Error);
success(done, io, client);
});
});
});

it("should timeout if the client does not acknowledge the event in time", (done) => {
const io = new Server(0);
const client = createClient(io, "/");

client.on("echo", (arg, cb) => {
cb(arg);
});

let count = 0;

io.on("connection", (socket) => {
socket.timeout(0).emit("echo", 42, (err) => {
expect(err).to.be.an(Error);
count++;
});
});

setTimeout(() => {
expect(count).to.eql(1);
success(done, io, client);
}, 200);
});

it("should not timeout if the client does acknowledge the event", (done) => {
const io = new Server(0);
const client = createClient(io, "/");

client.on("echo", (arg, cb) => {
cb(arg);
});

io.on("connection", (socket) => {
socket.timeout(50).emit("echo", 42, (err, value) => {
expect(err).to.be(null);
expect(value).to.be(42);
success(done, io, client);
});
});
});
});
95 changes: 49 additions & 46 deletions test/socket.io.ts
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import { io as ioc, Socket as ClientSocket } from "socket.io-client";

import "./support/util";
import "./utility-methods";
import "./uws";

type callback = (err: Error | null, success: boolean) => void;

@@ -820,29 +821,6 @@ describe("socket.io", () => {
});
});

it("should close a client without namespace (2)", (done) => {
const srv = createServer();
const sio = new Server(srv, {
connectTimeout: 100,
});

sio.use((_, next) => {
next(new Error("nope"));
});

srv.listen(() => {
const socket = client(srv);

const success = () => {
socket.close();
sio.close();
done();
};

socket.on("disconnect", success);
});
});

it("should exclude a specific socket when emitting", (done) => {
const srv = createServer();
const io = new Server(srv);
@@ -1072,7 +1050,7 @@ describe("socket.io", () => {
reconnectionDelay: 100,
});
clientSocket.on("connect", () => {
srv.close();
sio.close();
});

clientSocket.io.on("reconnect_failed", () => {
@@ -1452,6 +1430,32 @@ describe("socket.io", () => {
}, 200);
});

it("should broadcast only one consecutive volatile event with binary (ws)", (done) => {
const srv = createServer();
const sio = new Server(srv, { transports: ["websocket"] });

let counter = 0;
srv.listen(() => {
sio.on("connection", (s) => {
// Wait to make sure there are no packets being sent for opening the connection
setTimeout(() => {
sio.volatile.emit("ev", Buffer.from([1, 2, 3]));
sio.volatile.emit("ev", Buffer.from([4, 5, 6]));
}, 20);
});

const socket = client(srv, { transports: ["websocket"] });
socket.on("ev", () => {
counter++;
});
});

setTimeout(() => {
expect(counter).to.be(1);
done();
}, 200);
});

it("should emit regular events after trying a failed volatile event (polling)", (done) => {
const srv = createServer();
const sio = new Server(srv, { transports: ["polling"] });
@@ -2515,28 +2519,6 @@ describe("socket.io", () => {
});
});
});

it("should pre encode a broadcast packet", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(() => {
const clientSocket = client(srv, { multiplex: false });

sio.on("connection", (socket) => {
socket.conn.on("packetCreate", (packet) => {
expect(packet.data).to.eql('2["hello","world"]');
expect(packet.options.wsPreEncoded).to.eql('42["hello","world"]');

clientSocket.close();
sio.close();
done();
});

sio.emit("hello", "world");
});
});
});
});

describe("middleware", () => {
@@ -2725,6 +2707,25 @@ describe("socket.io", () => {
if (++count === 2) done();
});
});

it("should only set `connected` to true after the middleware execution", (done) => {
const httpServer = createServer();
const io = new Server(httpServer);

const clientSocket = client(httpServer, "/");

io.use((socket, next) => {
expect(socket.connected).to.be(false);
expect(socket.disconnected).to.be(true);
next();
});

io.on("connection", (socket) => {
expect(socket.connected).to.be(true);
expect(socket.disconnected).to.be(false);
success(io, clientSocket, done);
});
});
});

describe("socket middleware", () => {
@@ -2864,4 +2865,6 @@ describe("socket.io", () => {
});
});
});

require("./socket-timeout");
});
24 changes: 24 additions & 0 deletions test/support/util.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import type { Server } from "../..";
import {
io as ioc,
ManagerOptions,
Socket as ClientSocket,
SocketOptions,
} from "socket.io-client";

const expect = require("expect.js");
const i = expect.stringify;

@@ -20,3 +28,19 @@ expect.Assertion.prototype.contain = function (...args) {
}
return contain.apply(this, args);
};

export function createClient(
io: Server,
nsp: string,
opts?: ManagerOptions & SocketOptions
): ClientSocket {
// @ts-ignore
const port = io.httpServer.address().port;
return ioc(`http://localhost:${port}${nsp}`, opts);
}

export function success(done: Function, io: Server, client: ClientSocket) {
io.close();
client.disconnect();
done();
}
195 changes: 195 additions & 0 deletions test/uws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { App, us_socket_local_port } from "uWebSockets.js";
import { Server } from "..";
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import request from "supertest";
import expect from "expect.js";

const createPartialDone = (done: (err?: Error) => void, count: number) => {
let i = 0;
return () => {
if (++i === count) {
done();
} else if (i > count) {
done(new Error(`partialDone() called too many times: ${i} > ${count}`));
}
};
};

const shouldNotHappen = (done) => () => done(new Error("should not happen"));

describe("socket.io with uWebSocket.js-based engine", () => {
let io: Server,
port: number,
client: ClientSocket,
clientWSOnly: ClientSocket,
clientPollingOnly: ClientSocket,
clientCustomNamespace: ClientSocket;

beforeEach((done) => {
const app = App();
io = new Server();
io.attachApp(app);

io.of("/custom");

app.listen(0, (listenSocket) => {
port = us_socket_local_port(listenSocket);

client = ioc(`http://localhost:${port}`);
clientWSOnly = ioc(`http://localhost:${port}`, {
transports: ["websocket"],
});
clientPollingOnly = ioc(`http://localhost:${port}`, {
transports: ["polling"],
});
clientCustomNamespace = ioc(`http://localhost:${port}/custom`);
});

const partialDone = createPartialDone(done, 4);
io.on("connection", partialDone);
io.of("/custom").on("connection", partialDone);
});

afterEach(() => {
io.close();
client.disconnect();
clientWSOnly.disconnect();
clientPollingOnly.disconnect();
clientCustomNamespace.disconnect();
});

it("should broadcast", (done) => {
const partialDone = createPartialDone(done, 3);

client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.emit("hello");
});

it("should broadcast in a namespace", (done) => {
client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", shouldNotHappen(done));
clientPollingOnly.on("hello", shouldNotHappen(done));
clientCustomNamespace.on("hello", done);

io.of("/custom").emit("hello");
});

it("should broadcast in a dynamic namespace", (done) => {
const dynamicNamespace = io.of(/\/dynamic-\d+/);
const dynamicClient = clientWSOnly.io.socket("/dynamic-101");

dynamicClient.on("connect", () => {
dynamicNamespace.emit("hello");
});

dynamicClient.on("hello", () => {
dynamicClient.disconnect();
done();
});
});

it("should broadcast binary content", (done) => {
const partialDone = createPartialDone(done, 3);

client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.emit("hello", Buffer.from([1, 2, 3]));
});

it("should broadcast volatile packet with binary content", (done) => {
const partialDone = createPartialDone(done, 3);

client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

// wait to make sure there are no packets being sent for opening the connection
setTimeout(() => {
io.volatile.emit("hello", Buffer.from([1, 2, 3]));
}, 20);
});

it("should broadcast in a room", (done) => {
const partialDone = createPartialDone(done, 2);

client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room1");

io.to("room1").emit("hello");
});

it("should broadcast in multiple rooms", (done) => {
const partialDone = createPartialDone(done, 2);

client.on("hello", shouldNotHappen(done));
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room2");

io.to(["room1", "room2"]).emit("hello");
});

it("should broadcast in all but a given room", (done) => {
const partialDone = createPartialDone(done, 2);

client.on("hello", partialDone);
clientWSOnly.on("hello", partialDone);
clientPollingOnly.on("hello", shouldNotHappen(done));
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room2");

io.except("room2").emit("hello");
});

it("should work even after leaving room", (done) => {
const partialDone = createPartialDone(done, 2);

client.on("hello", partialDone);
clientWSOnly.on("hello", shouldNotHappen(done));
clientPollingOnly.on("hello", partialDone);
clientCustomNamespace.on("hello", shouldNotHappen(done));

io.of("/").sockets.get(client.id)!.join("room1");
io.of("/").sockets.get(clientPollingOnly.id)!.join("room1");

io.of("/").sockets.get(clientWSOnly.id)!.join("room1");
io.of("/").sockets.get(clientWSOnly.id)!.leave("room1");

io.to("room1").emit("hello");
});

it("should serve static files", (done) => {
const clientVersion = require("socket.io-client/package.json").version;

request(`http://localhost:${port}`)
.get("/socket.io/socket.io.js")
.buffer(true)
.end((err, res) => {
if (err) return done(err);
expect(res.headers["content-type"]).to.be("application/javascript");
expect(res.headers.etag).to.be('"' + clientVersion + '"');
expect(res.headers["x-sourcemap"]).to.be(undefined);
expect(res.text).to.match(/engine\.io/);
expect(res.status).to.be(200);
done();
});
});
});