From 5060b827b0490084f2e8f4d6de2b86c01296a2fd Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 14 Mar 2024 16:47:34 +0100 Subject: [PATCH] fix: discard acknowledgements upon disconnection --- lib/socket.ts | 93 +++++++++++++++++++++++++++++--------- test/socket.ts | 118 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 22 deletions(-) diff --git a/lib/socket.ts b/lib/socket.ts index 532a7423..f2ac9287 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -241,7 +241,33 @@ export class Socket< private readonly _opts: SocketOptions; private ids: number = 0; - private acks: object = {}; + /** + * A map containing acknowledgement handlers. + * + * The `withError` attribute is used to differentiate handlers that accept an error as first argument: + * + * - `socket.emit("test", (err, value) => { ... })` with `ackTimeout` option + * - `socket.timeout(5000).emit("test", (err, value) => { ... })` + * - `const value = await socket.emitWithAck("test")` + * + * From those that don't: + * + * - `socket.emit("test", (value) => { ... });` + * + * In the first case, the handlers will be called with an error when: + * + * - the timeout is reached + * - the socket gets disconnected + * + * In the second case, the handlers will be simply discarded upon disconnection, since the client will never receive + * an acknowledgement from the server. + * + * @private + */ + private acks: Record< + string, + ((...args: any[]) => void) & { withError?: boolean } + > = {}; private flags: Flags = {}; private subs?: Array; private _anyListeners: Array<(...args: any[]) => void>; @@ -409,7 +435,7 @@ export class Socket< const id = this.ids++; debug("emitting packet with ack id %d", id); - const ack = args.pop() as Function; + const ack = args.pop() as (...args: any[]) => void; this._registerAckCallback(id, ack); packet.id = id; } @@ -438,7 +464,7 @@ export class Socket< /** * @private */ - private _registerAckCallback(id: number, ack: Function) { + private _registerAckCallback(id: number, ack: (...args: any[]) => void) { const timeout = this.flags.timeout ?? this._opts.ackTimeout; if (timeout === undefined) { this.acks[id] = ack; @@ -458,11 +484,14 @@ export class Socket< ack.call(this, new Error("operation has timed out")); }, timeout); - this.acks[id] = (...args) => { + const fn = (...args: any[]) => { // @ts-ignore this.io.clearTimeoutFn(timer); - ack.apply(this, [null, ...args]); + ack.apply(this, args); }; + fn.withError = true; + + this.acks[id] = fn; } /** @@ -485,17 +514,12 @@ export class Socket< ev: Ev, ...args: AllButLast> ): Promise>>> { - // the timeout flag is optional - const withErr = - this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined; return new Promise((resolve, reject) => { - args.push((arg1, arg2) => { - if (withErr) { - return arg1 ? reject(arg1) : resolve(arg2); - } else { - return resolve(arg1); - } - }); + const fn = (arg1, arg2) => { + return arg1 ? reject(arg1) : resolve(arg2); + }; + fn.withError = true; + args.push(fn); this.emit(ev, ...(args as any[] as EventParams)); }); } @@ -647,6 +671,28 @@ export class Socket< this.connected = false; delete this.id; this.emitReserved("disconnect", reason, description); + this._clearAcks(); + } + + /** + * Clears the acknowledgement handlers upon disconnection, since the client will never receive an acknowledgement from + * the server. + * + * @private + */ + private _clearAcks() { + Object.keys(this.acks).forEach((id) => { + const isBuffered = this.sendBuffer.some( + (packet) => String(packet.id) === id + ); + if (!isBuffered) { + // note: handlers that do not accept an error as first argument are ignored here + if (this.acks[id].withError) { + this.acks[id].call(this, new Error("socket has been disconnected")); + } + delete this.acks[id]; + } + }); } /** @@ -756,20 +802,25 @@ export class Socket< } /** - * Called upon a server acknowlegement. + * Called upon a server acknowledgement. * * @param packet * @private */ private onack(packet: Packet): void { const ack = this.acks[packet.id]; - if ("function" === typeof ack) { - debug("calling ack %s with %j", packet.id, packet.data); - ack.apply(this, packet.data); - delete this.acks[packet.id]; - } else { + if (typeof ack !== "function") { debug("bad ack %s", packet.id); + return; + } + delete this.acks[packet.id]; + debug("calling ack %s with %j", packet.id, packet.data); + // @ts-ignore FIXME ack is inferred as 'never' + if (ack.withError) { + packet.data.unshift(null); } + // @ts-ignore + ack.apply(this, packet.data); } /** diff --git a/test/socket.ts b/test/socket.ts index e8bf70b4..dc8dbbd4 100644 --- a/test/socket.ts +++ b/test/socket.ts @@ -651,7 +651,7 @@ describe("socket", () => { }); }); - it("should use the default value", () => { + it("should use the default timeout value", () => { return wrap((done) => { const socket = io(BASE_URL + "/", { ackTimeout: 50, @@ -663,5 +663,121 @@ describe("socket", () => { }); }); }); + + describe("acknowledgement upon disconnection", () => { + it("should not ack upon disconnection (callback)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + }); + + socket.on("connect", () => { + socket.emit("echo", "a", (_value) => { + done(new Error("should not happen")); + }); + + socket.disconnect(); + + setTimeout(() => success(done, socket), 100); + }); + }); + }); + + it("should ack with an error upon disconnection (callback & timeout)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + }); + + socket.on("connect", () => { + socket.timeout(10_000).emit("echo", "a", (err) => { + expect(err).to.be.an(Error); + + success(done, socket); + }); + + socket.disconnect(); + }); + }); + }); + + it("should ack with an error upon disconnection (callback & ackTimeout)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + ackTimeout: 10_000, + }); + + socket.on("connect", () => { + socket.emit("echo", "a", (err) => { + expect(err).to.be.an(Error); + + success(done, socket); + }); + + socket.disconnect(); + }); + }); + }); + + it("should ack with an error upon disconnection (promise)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + }); + + socket.on("connect", () => { + socket.emitWithAck("echo", "a").catch((err) => { + expect(err).to.be.an(Error); + + success(done, socket); + }); + + socket.disconnect(); + }); + }); + }); + + it("should ack with an error upon disconnection (promise & timeout)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + }); + + socket.on("connect", () => { + socket + .timeout(10_000) + .emitWithAck("echo", "a") + .catch((err) => { + expect(err).to.be.an(Error); + + success(done, socket); + }); + + socket.disconnect(); + }); + }); + }); + + it("should not discard an unsent ack (callback)", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + }); + + socket.once("connect", () => { + socket.disconnect(); + + socket.emit("echo", "a", (value) => { + expect(value).to.eql("a"); + + success(done, socket); + }); + + setTimeout(() => socket.connect(), 100); + }); + }); + }); + }); }); });