Skip to content

Commit

Permalink
Merge pull request #10359 from tijsmoree/redis-microservice-psubscribe
Browse files Browse the repository at this point in the history
feat(microservices): enable wildcards in redis microservice patterns
  • Loading branch information
kamilmysliwiec committed Apr 17, 2023
2 parents b908f1a + 39d6e8e commit 69fba03
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 13 deletions.
Expand Up @@ -104,6 +104,10 @@ export interface RedisOptions {
port?: number;
retryAttempts?: number;
retryDelay?: number;
/**
* Use `psubscribe`/`pmessage` to enable wildcards in the patterns
*/
wildcards?: boolean;
serializer?: Serializer;
deserializer?: Deserializer;
} & IORedisOptions;
Expand Down
28 changes: 21 additions & 7 deletions packages/microservices/server/server-redis.ts
Expand Up @@ -63,13 +63,23 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
}

public bindEvents(subClient: Redis, pubClient: Redis) {
subClient.on(MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
subClient.on(
this.options.wildcards ? 'pmessage' : MESSAGE_EVENT,
this.getMessageHandler(pubClient).bind(this),
);
const subscribePatterns = [...this.messageHandlers.keys()];
subscribePatterns.forEach(pattern => {
const { isEventHandler } = this.messageHandlers.get(pattern);
subClient.subscribe(
isEventHandler ? pattern : this.getRequestPattern(pattern),
);

const channel = isEventHandler
? pattern
: this.getRequestPattern(pattern);

if (this.options.wildcards) {
subClient.psubscribe(channel);
} else {
subClient.subscribe(channel);
}
});
}

Expand All @@ -89,18 +99,22 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
}

public getMessageHandler(pub: Redis) {
return async (channel: string, buffer: string | any) =>
this.handleMessage(channel, buffer, pub);
return this.options.wildcards
? (channel: string, pattern: string, buffer: string | any) =>
this.handleMessage(channel, buffer, pub, pattern)
: (channel: string, buffer: string | any) =>
this.handleMessage(channel, buffer, pub, channel);
}

public async handleMessage(
channel: string,
buffer: string | any,
pub: Redis,
pattern: string,
) {
const rawMessage = this.parseMessage(buffer);
const packet = await this.deserializer.deserialize(rawMessage, { channel });
const redisCtx = new RedisContext([channel]);
const redisCtx = new RedisContext([pattern]);

if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(channel, packet, redisCtx);
Expand Down
42 changes: 36 additions & 6 deletions packages/microservices/test/server/server-redis.spec.ts
Expand Up @@ -67,7 +67,7 @@ describe('ServerRedis', () => {
});
});
describe('handleConnection', () => {
let onSpy: sinon.SinonSpy, subscribeSpy: sinon.SinonSpy, sub;
let onSpy: sinon.SinonSpy, subscribeSpy: sinon.SinonSpy, sub, psub;

beforeEach(() => {
onSpy = sinon.spy();
Expand All @@ -76,12 +76,24 @@ describe('ServerRedis', () => {
on: onSpy,
subscribe: subscribeSpy,
};
psub = {
on: onSpy,
psubscribe: subscribeSpy,
};
});
it('should bind "message" event to handler', () => {
it('should bind "message" event to handler if wildcards are disabled', () => {
server.bindEvents(sub, null);
expect(onSpy.getCall(0).args[0]).to.be.equal('message');
});
it('should subscribe to each pattern', () => {
it('should bind "pmessage" event to handler if wildcards are enabled', () => {
(server as any).options = {};
(server as any).options.wildcards = true;

server.bindEvents(psub, null);
expect(onSpy.getCall(0).args[0]).to.be.equal('pmessage');
});

it('should "subscribe" to each pattern if wildcards are disabled', () => {
const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
Expand All @@ -90,6 +102,19 @@ describe('ServerRedis', () => {
server.bindEvents(sub, null);
expect(subscribeSpy.calledWith(pattern)).to.be.true;
});

it('should "psubscribe" to each pattern if wildcards are enabled', () => {
(server as any).options = {};
(server as any).options.wildcards = true;

const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
[pattern]: handler,
});
server.bindEvents(psub, null);
expect(subscribeSpy.calledWith(pattern)).to.be.true;
});
});
describe('getMessageHandler', () => {
it(`should return function`, () => {
Expand All @@ -111,12 +136,17 @@ describe('ServerRedis', () => {
const handleEventSpy = sinon.spy(server, 'handleEvent');
sinon.stub(server, 'parseMessage').callsFake(() => ({ data } as any));

await server.handleMessage(channel, JSON.stringify({}), null);
await server.handleMessage(channel, JSON.stringify({}), null, channel);
expect(handleEventSpy.called).to.be.true;
});
it(`should publish NO_MESSAGE_HANDLER if pattern not exists in messageHandlers object`, async () => {
sinon.stub(server, 'parseMessage').callsFake(() => ({ id, data } as any));
await server.handleMessage(channel, JSON.stringify({ id }), null);
await server.handleMessage(
channel,
JSON.stringify({ id }),
null,
channel,
);
expect(
getPublisherSpy.calledWith({
id,
Expand All @@ -132,7 +162,7 @@ describe('ServerRedis', () => {
});
sinon.stub(server, 'parseMessage').callsFake(() => ({ id, data } as any));

await server.handleMessage(channel, {}, null);
await server.handleMessage(channel, {}, null, channel);
expect(handler.calledWith(data)).to.be.true;
});
});
Expand Down

0 comments on commit 69fba03

Please sign in to comment.