From 81b9be021d471796bba00ee7b08768df9d7e2689 Mon Sep 17 00:00:00 2001 From: luin Date: Thu, 8 Apr 2021 18:49:09 +0800 Subject: [PATCH] fix(cluster): subscriber connection leaks Closes #1325 --- lib/cluster/ClusterSubscriber.ts | 9 ++++- test/functional/cluster/ClusterSubscriber.ts | 36 ++++++++++++++++++++ test/helpers/mock_server.ts | 4 +++ 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 test/functional/cluster/ClusterSubscriber.ts diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 1a459013..cbc6e3f3 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -50,6 +50,10 @@ export default class ClusterSubscriber { lastActiveSubscriber.disconnect(); } + if (this.subscriber) { + this.subscriber.disconnect(); + } + const sampleNode = sample(this.connectionPool.getNodes()); if (!sampleNode) { debug( @@ -113,7 +117,10 @@ export default class ClusterSubscriber { this.lastActiveSubscriber = this.subscriber; } }) - .catch(noop); + .catch(() => { + // TODO: should probably disconnect the subscriber and try again. + debug("failed to %s %d channels", type, channels.length); + }); } } } else { diff --git a/test/functional/cluster/ClusterSubscriber.ts b/test/functional/cluster/ClusterSubscriber.ts new file mode 100644 index 00000000..5ee0d359 --- /dev/null +++ b/test/functional/cluster/ClusterSubscriber.ts @@ -0,0 +1,36 @@ +import ConnectionPool from "../../../lib/cluster/ConnectionPool"; +import ClusterSubscriber from "../../../lib/cluster/ClusterSubscriber"; +import { EventEmitter } from "events"; +import MockServer from "../../helpers/mock_server"; +import { expect } from "chai"; + +describe("ClusterSubscriber", () => { + it("cleans up subscribers when selecting a new one", async () => { + const pool = new ConnectionPool({}); + const subscriber = new ClusterSubscriber(pool, new EventEmitter()); + + let rejectSubscribes = false; + const server = new MockServer(30000, (argv) => { + if (rejectSubscribes && argv[0] === "subscribe") { + return new Error("Failed to subscribe"); + } + return "OK"; + }); + + pool.findOrCreate({ host: "127.0.0.1", port: 30000 }); + + subscriber.start(); + await subscriber.getInstance().subscribe("foo"); + rejectSubscribes = true; + + subscriber.start(); + await subscriber.getInstance().echo("hello"); + + subscriber.start(); + await subscriber.getInstance().echo("hello"); + + expect(server.getAllClients()).to.have.lengthOf(1); + subscriber.stop(); + pool.reset([]); + }); +}); diff --git a/test/helpers/mock_server.ts b/test/helpers/mock_server.ts index 5101f064..459bb62a 100644 --- a/test/helpers/mock_server.ts +++ b/test/helpers/mock_server.ts @@ -173,4 +173,8 @@ export default class MockServer extends EventEmitter { return getConnectionName(client) === name; }); } + + getAllClients(): Socket[] { + return this.clients.filter(Boolean); + } }