Skip to content

Commit

Permalink
feat(cluster): support retrying MOVED with a delay (#1254)
Browse files Browse the repository at this point in the history
Add delay to MOVED response. In case a MOVED response is recieved from the cluster use a delayed queue to retry commands

Co-authored-by: ohad-israeli <ohadisra@gmail.com>
  • Loading branch information
ohadisraeli and ohad-israeli committed Mar 14, 2021
1 parent d174d86 commit 8599981
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
7 changes: 5 additions & 2 deletions README.md
Expand Up @@ -889,9 +889,12 @@ cluster.get("foo", (err, res) => {
will resend the commands after the specified time (in ms).
- `retryDelayOnTryAgain`: If this option is a number (by default, it is `100`), the client
will resend the commands rejected with `TRYAGAIN` error after the specified time (in ms).
- `retryDelayOnMoved`: By default, this value is `0` (in ms), which means when a `MOVED` error is received, the client will resend
the command instantly to the node returned together with the `MOVED` error. However, sometimes it takes time for a cluster to become
state stabilized after a failover, so adding a delay before resending can prevent a ping pong effect.
- `redisOptions`: Default options passed to the constructor of `Redis` when connecting to a node.
- `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`)
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`)
- `slotsRefreshTimeout`: Milliseconds before a timeout occurs while refreshing slots from the cluster (default `1000`).
- `slotsRefreshInterval`: Milliseconds between every automatic slots refresh (default `5000`).

### Read-write splitting

Expand Down
12 changes: 12 additions & 0 deletions lib/cluster/ClusterOptions.ts
Expand Up @@ -94,6 +94,17 @@ export interface IClusterOptions {
*/
retryDelayOnTryAgain?: number;

/**
* By default, this value is 0, which means when a `MOVED` error is received,
* the client will resend the command instantly to the node returned together with
* the `MOVED` error. However, sometimes it takes time for a cluster to become
* state stabilized after a failover, so adding a delay before resending can
* prevent a ping pong effect.
*
* @default 0
*/
retryDelayOnMoved?: number;

/**
* The milliseconds before a timeout occurs while refreshing
* slots from the cluster.
Expand Down Expand Up @@ -184,6 +195,7 @@ export const DEFAULT_CLUSTER_OPTIONS: IClusterOptions = {
enableReadyCheck: true,
scaleReads: "master",
maxRedirections: 16,
retryDelayOnMoved: 0,
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100,
retryDelayOnTryAgain: 100,
Expand Down
15 changes: 13 additions & 2 deletions lib/cluster/index.ts
Expand Up @@ -745,8 +745,19 @@ class Cluster extends EventEmitter {
return;
}
const errv = error.message.split(" ");
if (errv[0] === "MOVED" || errv[0] === "ASK") {
handlers[errv[0] === "MOVED" ? "moved" : "ask"](errv[1], errv[2]);
if (errv[0] === "MOVED") {
const timeout = this.options.retryDelayOnMoved;
if (timeout && typeof timeout === "number") {
this.delayQueue.push(
"moved",
handlers.moved.bind(null, errv[1], errv[2]),
{ timeout }
);
} else {
handlers.moved(errv[1], errv[2]);
}
} else if (errv[0] === "ASK") {
handlers.ask(errv[1], errv[2]);
} else if (errv[0] === "TRYAGAIN") {
this.delayQueue.push("tryagain", handlers.tryagain, {
timeout: this.options.retryDelayOnTryAgain,
Expand Down
40 changes: 40 additions & 0 deletions test/functional/cluster/moved.ts
Expand Up @@ -2,6 +2,7 @@ import * as calculateSlot from "cluster-key-slot";
import MockServer from "../../helpers/mock_server";
import { expect } from "chai";
import { Cluster } from "../../../lib";
import * as sinon from "sinon";

describe("cluster:MOVED", function () {
it("should auto redirect the command to the correct nodes", function (done) {
Expand Down Expand Up @@ -117,4 +118,43 @@ describe("cluster:MOVED", function () {
cluster.get("foo");
});
});

it("should supports retryDelayOnMoved", (done) => {
let cluster = undefined;
const slotTable = [[0, 16383, ["127.0.0.1", 30001]]];
new MockServer(30001, function (argv) {
if (argv[0] === "cluster" && argv[1] === "slots") {
return slotTable;
}
if (argv[0] === "get" && argv[1] === "foo") {
return new Error("MOVED " + calculateSlot("foo") + " 127.0.0.1:30002");
}
});

new MockServer(30002, function (argv) {
if (argv[0] === "cluster" && argv[1] === "slots") {
return slotTable;
}
if (argv[0] === "get" && argv[1] === "foo") {
cluster.disconnect();
done();
}
});

const retryDelayOnMoved = 789;
cluster = new Cluster([{ host: "127.0.0.1", port: "30001" }], {
retryDelayOnMoved,
});
cluster.on("ready", function () {
sinon.stub(global, "setTimeout").callsFake((body, ms) => {
if (ms === retryDelayOnMoved) {
process.nextTick(() => {
body();
});
}
});

cluster.get("foo");
});
});
});

0 comments on commit 8599981

Please sign in to comment.