From 42f1ee107174366a79ff94bec8a7a1ac353e035c Mon Sep 17 00:00:00 2001 From: Shogun Date: Sun, 13 Jun 2021 09:51:50 +0200 Subject: [PATCH] fix: fixed autopipeline performances. (#1226) --- .gitignore | 3 ++- lib/autoPipelining.ts | 16 ++---------- lib/cluster/index.ts | 24 +++++++++++++++++ lib/pipeline.ts | 9 +++---- lib/redis/index.ts | 4 +++ test/functional/cluster/autopipelining.ts | 32 ++++++++++++++--------- test/functional/cluster/connect.ts | 18 +++++++++++++ test/functional/connection.ts | 17 ++++++++++++ 8 files changed, 90 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 1dfab893..101a6de3 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ node_modules built .vscode -benchmarks/fixtures/*.txt \ No newline at end of file +benchmarks/fixtures/*.txt + diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index fe1948d5..238fe114 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -18,19 +18,6 @@ export const notAllowedAutoPipelineCommands = [ "unpsubscribe", ]; -function findAutoPipeline( - client, - _commandName, - ...args: Array -): string { - if (!client.isCluster) { - return "main"; - } - - // We have slot information, we can improve routing by grouping slots served by the same subset of nodes - return client.slots[calculateSlot(args[0])].join(","); -} - function executeAutoPipeline(client, slotKey: string) { /* If a pipeline is already executing, keep queueing up commands @@ -116,7 +103,8 @@ export function executeWithAutoPipelining( }); } - const slotKey = findAutoPipeline(client, commandName, ...args); + // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes + const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main'; if (!client._autoPipelines.has(slotKey)) { const pipeline = client.pipeline(); diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 86bdae5f..58b7c31e 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -69,6 +69,8 @@ class Cluster extends EventEmitter { private isRefreshing = false; public isCluster = true; private _autoPipelines: Map = new Map(); + private _groupsIds: {[key: string]: number} = {}; + private _groupsBySlot: number[] = Array(16384); private _runningAutoPipelines: Set = new Set(); private _readyDelayedCallbacks: CallbackFunction[] = []; public _addedScriptHashes: { [key: string]: any } = {}; @@ -188,7 +190,10 @@ class Cluster extends EventEmitter { return; } + // Make sure only one timer is active at a time clearInterval(this._addedScriptHashesCleanInterval); + + // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { this._addedScriptHashes = {}; }, this.options.maxScriptsCachingTime); @@ -627,6 +632,7 @@ class Cluster extends EventEmitter { } else { _this.slots[slot] = [key]; } + _this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')]; _this.connectionPool.findOrCreate(_this.natMapper(key)); tryConnection(); debug("refreshing slot caches... (triggered by MOVED error)"); @@ -860,6 +866,24 @@ class Cluster extends EventEmitter { } } + // Assign to each node keys a numeric value to make autopipeline comparison faster. + this._groupsIds = Object.create(null); + let j = 0; + for (let i = 0; i < 16384; i++) { + const target = (this.slots[i] || []).join(';'); + + if (!target.length) { + this._groupsBySlot[i] = undefined; + continue; + } + + if (!this._groupsIds[target]) { + this._groupsIds[target] = ++j; + } + + this._groupsBySlot[i] = this._groupsIds[target]; + } + this.connectionPool.reset(nodes); callback(); }, this.options.slotsRefreshTimeout) diff --git a/lib/pipeline.ts b/lib/pipeline.ts index 11621ecd..40aaba62 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -15,12 +15,10 @@ import Commander from "./commander"; */ function generateMultiWithNodes(redis, keys) { const slot = calculateSlot(keys[0]); - const target = redis.slots[slot].join(","); - + const target = redis._groupsBySlot[slot]; + for (let i = 1; i < keys.length; i++) { - const currentTarget = redis.slots[calculateSlot(keys[i])].join(","); - - if (currentTarget !== target) { + if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) { return -1; } } @@ -158,6 +156,7 @@ Pipeline.prototype.fillResult = function (value, position) { moved: function (slot, key) { _this.preferKey = key; _this.redis.slots[errv[1]] = [key]; + _this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; _this.redis.refreshSlotsCache(); _this.exec(); }, diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 4cfc36ab..e5bf5b53 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -304,7 +304,11 @@ Redis.prototype.connect = function (callback) { reject(new Error("Redis is already connecting/connected")); return; } + + // Make sure only one timer is active at a time clearInterval(this._addedScriptHashesCleanInterval); + + // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { this._addedScriptHashes = {}; }, this.options.maxScriptsCachingTime); diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 4039c9bf..02f933d6 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -11,6 +11,11 @@ use(require("chai-as-promised")); Instead foo1 and foo2 are usually served by different nodes in a 3-nodes cluster. */ describe("autoPipelining for cluster", function () { + function changeSlot(cluster, from, to) { + cluster.slots[from] = cluster.slots[to]; + cluster._groupsBySlot[from] = cluster._groupsBySlot[to]; + } + beforeEach(() => { const slotTable = [ [0, 5000, ["127.0.0.1", 30001]], @@ -402,11 +407,12 @@ describe("autoPipelining for cluster", function () { const promise4 = cluster.set("foo6", "bar"); // Override slots to induce a failure - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - const key5Slot = calculateKeySlot("foo5"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; - cluster.slots[key2Slot] = cluster.slots[key5Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + + changeSlot(cluster, key1Slot, key2Slot); + changeSlot(cluster, key2Slot, key5Slot); await expect(promise1).to.eventually.be.rejectedWith( "All keys in the pipeline should belong to the same slots allocation group" @@ -492,11 +498,11 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(4); // Override slots to induce a failure - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - const key5Slot = calculateKeySlot("foo5"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; - cluster.slots[key2Slot] = cluster.slots[key5Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + changeSlot(cluster, key1Slot, key2Slot); + changeSlot(cluster, key2Slot, key5Slot); }); }); @@ -541,9 +547,9 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(3); - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + changeSlot(cluster, key1Slot, key2Slot); }); }); }); diff --git a/test/functional/cluster/connect.ts b/test/functional/cluster/connect.ts index f1e4ba33..4d51f918 100644 --- a/test/functional/cluster/connect.ts +++ b/test/functional/cluster/connect.ts @@ -438,4 +438,22 @@ describe("cluster:disconnect", function () { done(); }); }); + + it("should clear the added script hashes interval even when no connection succeeded", function (done) { + const cluster = new Cluster([{ host: "127.0.0.1", port: "0" }], { + enableReadyCheck: false, + }); + + let attempt = 0; + cluster.on("error", function () { + if(attempt < 5) { + attempt ++; + return + } + cluster.quit(); + + expect(cluster._addedScriptHashesCleanInterval).to.be.null; + done(); + }); + }); }); diff --git a/test/functional/connection.ts b/test/functional/connection.ts index ef4a3447..8ee58f13 100644 --- a/test/functional/connection.ts +++ b/test/functional/connection.ts @@ -549,4 +549,21 @@ describe("disconnection", function () { } }); }); + + it("should clear the added script hashes interval even when no connection succeeded", function (done) { + let attempt = 0; + const redis = new Redis(0, 'localhost'); + + redis.on("error", function () { + if(attempt < 5) { + attempt ++; + return + } + + redis.quit(); + + expect(redis._addedScriptHashesCleanInterval).to.be.null; + done(); + }); + }); });