diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 6c56909c..09e8da36 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -15,6 +15,14 @@ export default class ClusterSubscriber { private connectionPool: ConnectionPool, private emitter: EventEmitter ) { + // If the current node we're using as the subscriber disappears + // from the node pool for some reason, we will select a new one + // to connect to. + // Note that this event is only triggered if the connection to + // the node has been used; cluster subscriptions are setup with + // lazyConnect = true. It's possible for the subscriber node to + // disappear without this method being called! + // See https://github.com/luin/ioredis/pull/1589 this.connectionPool.on("-node", (_, key: string) => { if (!this.started || !this.subscriber) { return; @@ -54,16 +62,30 @@ export default class ClusterSubscriber { debug("stopped"); } + private onSubscriberEnd = () => { + if (!this.started) { + debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."); + return; + } + // If the subscriber closes whilst it's still the active connection, + // we might as well try to connecting to a new node if possible to + // minimise the number of missed publishes. + debug("subscriber has disconnected, selecting a new one..."); + this.selectSubscriber(); + } + private selectSubscriber() { const lastActiveSubscriber = this.lastActiveSubscriber; // Disconnect the previous subscriber even if there // will not be a new one. if (lastActiveSubscriber) { + lastActiveSubscriber.off("end", this.onSubscriberEnd); lastActiveSubscriber.disconnect(); } if (this.subscriber) { + this.subscriber.off("end", this.onSubscriberEnd); this.subscriber.disconnect(); } @@ -97,11 +119,22 @@ export default class ClusterSubscriber { connectionName: getConnectionName("subscriber", options.connectionName), lazyConnect: true, tls: options.tls, + // Don't try to reconnect the subscriber connection. If the connection fails + // we will get an end event (handled below), at which point we'll pick a new + // node from the pool and try to connect to that as the subscriber connection. + retryStrategy: null }); // Ignore the errors since they're handled in the connection pool. this.subscriber.on("error", noop); + // The node we lost connection to may not come back up in a + // reasonable amount of time (e.g. a slave that's taken down + // for maintainence), we could potentially miss many published + // messages so we should reconnect as quickly as possible, to + // a different node if needed. + this.subscriber.once("end", this.onSubscriberEnd); + // Re-subscribe previous channels const previousChannels = { subscribe: [], psubscribe: [] }; if (lastActiveSubscriber) { diff --git a/test/functional/cluster/pub_sub.ts b/test/functional/cluster/pub_sub.ts index db6a1d34..4d4a68e5 100644 --- a/test/functional/cluster/pub_sub.ts +++ b/test/functional/cluster/pub_sub.ts @@ -49,6 +49,7 @@ describe("cluster:pub/sub", function () { sub.subscribe("test cluster", function () { sub.set("foo", "bar").then((res) => { expect(res).to.eql("OK"); + sub.disconnect() done(); }); }); @@ -73,6 +74,7 @@ describe("cluster:pub/sub", function () { const sub = new Cluster([{ port: "30001", password: "abc" }]); sub.subscribe("test cluster", function () { + sub.disconnect() done(); }); });