Skip to content

Commit

Permalink
feat(microservices): enable wildcards in redis microservice patterns
Browse files Browse the repository at this point in the history
The redis microservice now makes use of `psubscribe` and `pmessage`,
which makes it possible to use wildcards as specified by the Redis
documentation.

Closes #10344
  • Loading branch information
tijsmoree committed Oct 4, 2022
1 parent 4eb5fb0 commit fad4577
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
13 changes: 6 additions & 7 deletions packages/microservices/server/server-redis.ts
@@ -1,8 +1,6 @@
import { isUndefined } from '@nestjs/common/utils/shared.utils';
import { Observable } from 'rxjs';
import {
ERROR_EVENT,
MESSAGE_EVENT,
NO_MESSAGE_HANDLER,
REDIS_DEFAULT_HOST,
REDIS_DEFAULT_PORT,
Expand Down Expand Up @@ -64,11 +62,11 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
}

public bindEvents(subClient: Redis, pubClient: Redis) {
subClient.on(MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
subClient.on('pmessage', this.getMessageHandler(pubClient).bind(this));
const subscribePatterns = [...this.messageHandlers.keys()];
subscribePatterns.forEach(pattern => {
const { isEventHandler } = this.messageHandlers.get(pattern);
subClient.subscribe(
subClient.psubscribe(
isEventHandler ? pattern : this.getRequestPattern(pattern),
);
});
Expand All @@ -90,18 +88,19 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
}

public getMessageHandler(pub: Redis) {
return async (channel: string, buffer: string | any) =>
this.handleMessage(channel, buffer, pub);
return async (channel: string, pattern: string, buffer: string | any) =>
this.handleMessage(channel, buffer, pub, pattern);
}

public async handleMessage(
channel: string,
buffer: string | any,
pub: Redis,
pattern: string,
) {
const rawMessage = this.parseMessage(buffer);
const packet = await this.deserializer.deserialize(rawMessage, { channel });
const redisCtx = new RedisContext([channel]);
const redisCtx = new RedisContext([pattern]);

if (isUndefined((packet as IncomingRequest).id)) {
return this.handleEvent(channel, packet, redisCtx);
Expand Down
19 changes: 12 additions & 7 deletions packages/microservices/test/server/server-redis.spec.ts
Expand Up @@ -74,14 +74,14 @@ describe('ServerRedis', () => {
subscribeSpy = sinon.spy();
sub = {
on: onSpy,
subscribe: subscribeSpy,
psubscribe: subscribeSpy,
};
});
it('should bind "message" event to handler', () => {
it('should bind "pmessage" event to handler', () => {
server.bindEvents(sub, null);
expect(onSpy.getCall(0).args[0]).to.be.equal('message');
expect(onSpy.getCall(0).args[0]).to.be.equal('pmessage');
});
it('should subscribe to each pattern', () => {
it('should "psubscribe" to each pattern', () => {
const pattern = 'test';
const handler = sinon.spy();
(server as any).messageHandlers = objectToMap({
Expand Down Expand Up @@ -111,12 +111,17 @@ describe('ServerRedis', () => {
const handleEventSpy = sinon.spy(server, 'handleEvent');
sinon.stub(server, 'parseMessage').callsFake(() => ({ data } as any));

await server.handleMessage(channel, JSON.stringify({}), null);
await server.handleMessage(channel, JSON.stringify({}), null, channel);
expect(handleEventSpy.called).to.be.true;
});
it(`should publish NO_MESSAGE_HANDLER if pattern not exists in messageHandlers object`, async () => {
sinon.stub(server, 'parseMessage').callsFake(() => ({ id, data } as any));
await server.handleMessage(channel, JSON.stringify({ id }), null);
await server.handleMessage(
channel,
JSON.stringify({ id }),
null,
channel,
);
expect(
getPublisherSpy.calledWith({
id,
Expand All @@ -132,7 +137,7 @@ describe('ServerRedis', () => {
});
sinon.stub(server, 'parseMessage').callsFake(() => ({ id, data } as any));

await server.handleMessage(channel, {}, null);
await server.handleMessage(channel, {}, null, channel);
expect(handler.calledWith(data)).to.be.true;
});
});
Expand Down

0 comments on commit fad4577

Please sign in to comment.