From 71adbb84caa4a8c5668b609fbd20ad11dbad7533 Mon Sep 17 00:00:00 2001 From: Andy Date: Wed, 13 Jul 2022 16:56:03 +0100 Subject: [PATCH] fix: remove end event listener before disconnecting the old subscriber to prevent dead loop --- lib/cluster/ClusterSubscriber.ts | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts index 508088643..180e4bbc8 100644 --- a/lib/cluster/ClusterSubscriber.ts +++ b/lib/cluster/ClusterSubscriber.ts @@ -62,6 +62,18 @@ export default class ClusterSubscriber { debug("stopped"); } + private onSubscriberEnd = () => { + if (!this.started) { + debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."); + return; + } + // If the subscriber closes whilst it's still the active connection, + // we might as well try to connecting to a new node if possible to + // minimise the number of missed publishes. + debug("subscriber has disconnected, selecting a new one..."); + this.selectSubscriber(); + } + private selectSubscriber() { const lastActiveSubscriber = this.lastActiveSubscriber; @@ -72,6 +84,7 @@ export default class ClusterSubscriber { } if (this.subscriber) { + this.subscriber.off("end", this.onSubscriberEnd); this.subscriber.disconnect(); } @@ -119,20 +132,7 @@ export default class ClusterSubscriber { // for maintainence), we could potentially miss many published // messages so we should reconnect as quickly as possible, to // a different node if needed. - const currentSub = this.subscriber; - this.subscriber.once("end", () => { - if (!this.started) { - debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."); - return; - } - // If the subscriber closes whilst it's still the active connection, - // we might as well try to connecting to a new node if possible to - // minimise the number of missed publishes. - if (currentSub === this.subscriber) { - debug("subscriber has disconnected, selecting a new one..."); - this.selectSubscriber(); - } - }); + this.subscriber.once("end", this.onSubscriberEnd); // Re-subscribe previous channels const previousChannels = { subscribe: [], psubscribe: [] };