Skip to content

Commit

Permalink
fix: always allow selecting a new node for cluster mode subscriptions…
Browse files Browse the repository at this point in the history
… when the current one fails (#1589)
  • Loading branch information
headlessme committed Jul 16, 2022
1 parent 07ee6ea commit 1c8cb85
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
33 changes: 33 additions & 0 deletions lib/cluster/ClusterSubscriber.ts
Expand Up @@ -15,6 +15,14 @@ export default class ClusterSubscriber {
private connectionPool: ConnectionPool,
private emitter: EventEmitter
) {
// If the current node we're using as the subscriber disappears
// from the node pool for some reason, we will select a new one
// to connect to.
// Note that this event is only triggered if the connection to
// the node has been used; cluster subscriptions are setup with
// lazyConnect = true. It's possible for the subscriber node to
// disappear without this method being called!
// See https://github.com/luin/ioredis/pull/1589
this.connectionPool.on("-node", (_, key: string) => {
if (!this.started || !this.subscriber) {
return;
Expand Down Expand Up @@ -54,16 +62,30 @@ export default class ClusterSubscriber {
debug("stopped");
}

private onSubscriberEnd = () => {
if (!this.started) {
debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting.");
return;
}
// If the subscriber closes whilst it's still the active connection,
// we might as well try to connecting to a new node if possible to
// minimise the number of missed publishes.
debug("subscriber has disconnected, selecting a new one...");
this.selectSubscriber();
}

private selectSubscriber() {
const lastActiveSubscriber = this.lastActiveSubscriber;

// Disconnect the previous subscriber even if there
// will not be a new one.
if (lastActiveSubscriber) {
lastActiveSubscriber.off("end", this.onSubscriberEnd);
lastActiveSubscriber.disconnect();
}

if (this.subscriber) {
this.subscriber.off("end", this.onSubscriberEnd);
this.subscriber.disconnect();
}

Expand Down Expand Up @@ -97,11 +119,22 @@ export default class ClusterSubscriber {
connectionName: getConnectionName("subscriber", options.connectionName),
lazyConnect: true,
tls: options.tls,
// Don't try to reconnect the subscriber connection. If the connection fails
// we will get an end event (handled below), at which point we'll pick a new
// node from the pool and try to connect to that as the subscriber connection.
retryStrategy: null
});

// Ignore the errors since they're handled in the connection pool.
this.subscriber.on("error", noop);

// The node we lost connection to may not come back up in a
// reasonable amount of time (e.g. a slave that's taken down
// for maintainence), we could potentially miss many published
// messages so we should reconnect as quickly as possible, to
// a different node if needed.
this.subscriber.once("end", this.onSubscriberEnd);

// Re-subscribe previous channels
const previousChannels = { subscribe: [], psubscribe: [] };
if (lastActiveSubscriber) {
Expand Down
2 changes: 2 additions & 0 deletions test/functional/cluster/pub_sub.ts
Expand Up @@ -49,6 +49,7 @@ describe("cluster:pub/sub", function () {
sub.subscribe("test cluster", function () {
sub.set("foo", "bar").then((res) => {
expect(res).to.eql("OK");
sub.disconnect()
done();
});
});
Expand All @@ -73,6 +74,7 @@ describe("cluster:pub/sub", function () {
const sub = new Cluster([{ port: "30001", password: "abc" }]);

sub.subscribe("test cluster", function () {
sub.disconnect()
done();
});
});
Expand Down

0 comments on commit 1c8cb85

Please sign in to comment.