diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index b608c7c092c..a0824e440ba 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -607,11 +607,41 @@ describe('Client', () => { } }); - testUtils.testWithClient('executeIsolated', async client => { - const id = await client.clientId(), - isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId()); - assert.ok(id !== isolatedId); - }, GLOBAL.SERVERS.OPEN); + describe('isolationPool', () => { + testUtils.testWithClient('executeIsolated', async client => { + const id = await client.clientId(), + isolatedId = await client.executeIsolated(isolatedClient => isolatedClient.clientId()); + assert.ok(id !== isolatedId); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should be able to use pool even before connect', async client => { + await client.executeIsolated(() => Promise.resolve()); + // make sure to destroy isolation pool + await client.connect(); + await client.disconnect(); + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + + testUtils.testWithClient('should work after reconnect (#2406)', async client => { + await client.disconnect(); + await client.connect(); + await client.executeIsolated(() => Promise.resolve()); + }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should throw ClientClosedError after disconnect', async client => { + await client.connect(); + await client.disconnect(); + await assert.rejects( + client.executeIsolated(() => Promise.resolve()), + ClientClosedError + ); + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + }); async function killClient< M extends RedisModules, @@ -731,7 +761,7 @@ describe('Client', () => { members.map(member => [member.value, member.score]).sort(sort) ); }, GLOBAL.SERVERS.OPEN); - + describe('PubSub', () => { testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => { function assertStringListener(message: string, channel: string) { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 5dd386647ef..5b9badf3f37 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -15,7 +15,6 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '. import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; -import { callbackify } from 'util'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -190,7 +189,7 @@ export default class RedisClient< readonly #options?: RedisClientOptions; readonly #socket: RedisSocket; readonly #queue: RedisCommandsQueue; - readonly #isolationPool: Pool>; + #isolationPool?: Pool>; readonly #v4: Record = {}; #selectedDB = 0; @@ -223,16 +222,9 @@ export default class RedisClient< this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); - this.#isolationPool = createPool({ - create: async () => { - const duplicate = this.duplicate({ - isolationPoolOptions: undefined - }).on('error', err => this.emit('error', err)); - await duplicate.connect(); - return duplicate; - }, - destroy: client => client.disconnect() - }, options?.isolationPoolOptions); + // should be initiated in connect, not here + // TODO: consider breaking in v5 + this.#isolationPool = this.#initiateIsolationPool(); this.#legacyMode(); } @@ -337,6 +329,19 @@ export default class RedisClient< .on('end', () => this.emit('end')); } + #initiateIsolationPool() { + return createPool({ + create: async () => { + const duplicate = this.duplicate({ + isolationPoolOptions: undefined + }).on('error', err => this.emit('error', err)); + await duplicate.connect(); + return duplicate; + }, + destroy: client => client.disconnect() + }, this.#options?.isolationPoolOptions); + } + #legacyMode(): void { if (!this.#options?.legacyMode) return; @@ -422,6 +427,8 @@ export default class RedisClient< } connect(): Promise { + // see comment in constructor + this.#isolationPool ??= this.#initiateIsolationPool(); return this.#socket.connect(); } @@ -704,6 +711,7 @@ export default class RedisClient< } executeIsolated(fn: (client: RedisClientType) => T | Promise): Promise { + if (!this.#isolationPool) return Promise.reject(new ClientClosedError()); return this.#isolationPool.use(fn); } @@ -802,8 +810,9 @@ export default class RedisClient< } async #destroyIsolationPool(): Promise { - await this.#isolationPool.drain(); - await this.#isolationPool.clear(); + await this.#isolationPool!.drain(); + await this.#isolationPool!.clear(); + this.#isolationPool = undefined; } ref(): void {