From bfbcd7bb8074e075911e131ca3f6fe5037c9bdb3 Mon Sep 17 00:00:00 2001 From: bangbang93 Date: Tue, 26 Jul 2022 15:46:18 +0800 Subject: [PATCH 1/3] fix(microservices): pass options to rmq deserialize --- packages/microservices/client/client-rmq.ts | 17 +++++++++++++++-- packages/microservices/server/server-rmq.ts | 4 ++-- .../test/client/client-rmq.spec.ts | 6 +++--- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 084788a7860..c412c2d2c43 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -186,10 +186,17 @@ export class ClientRMQ extends ClientProxy { public async handleMessage( packet: unknown, + options: Record, callback: (packet: WritePacket) => any, ) { + if (typeof options === 'function') { + callback = options; + options = undefined; + } + const { err, response, isDisposed } = await this.deserializer.deserialize( packet, + options, ); if (isDisposed || err) { callback({ @@ -210,8 +217,14 @@ export class ClientRMQ extends ClientProxy { ): () => void { try { const correlationId = randomStringGenerator(); - const listener = ({ content }: { content: any }) => - this.handleMessage(JSON.parse(content.toString()), callback); + const listener = ({ + content, + options, + }: { + content: any; + options: Record; + }) => + this.handleMessage(JSON.parse(content.toString()), options, callback); Object.assign(message, { id: correlationId }); const serializedPacket: ReadPacket & Partial = diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index e38e3519fd0..3fe35d53930 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -11,12 +11,12 @@ import { DISCONNECTED_RMQ_MESSAGE, NO_MESSAGE_HANDLER, RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT, + RQM_DEFAULT_NO_ASSERT, RQM_DEFAULT_NOACK, RQM_DEFAULT_PREFETCH_COUNT, RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS, RQM_DEFAULT_URL, - RQM_DEFAULT_NO_ASSERT, } from '../constants'; import { RmqContext } from '../ctx-host'; import { Transport } from '../enums'; @@ -142,7 +142,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { } const { content, properties } = message; const rawMessage = JSON.parse(content.toString()); - const packet = await this.deserializer.deserialize(rawMessage); + const packet = await this.deserializer.deserialize(rawMessage, properties); const pattern = isString(packet.pattern) ? packet.pattern : JSON.stringify(packet.pattern); diff --git a/packages/microservices/test/client/client-rmq.spec.ts b/packages/microservices/test/client/client-rmq.spec.ts index 2c764b57f79..73622fa9ba2 100644 --- a/packages/microservices/test/client/client-rmq.spec.ts +++ b/packages/microservices/test/client/client-rmq.spec.ts @@ -290,7 +290,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: false, }; - await client.handleMessage(packet, callback); + await client.handleMessage(packet, undefined, callback); expect( callback.calledWith({ err: packet.err, @@ -311,7 +311,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: true, }; - await client.handleMessage(packet, callback); + await client.handleMessage(packet, undefined, callback); expect( callback.calledWith({ err: undefined, @@ -333,7 +333,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: false, }; - await client.handleMessage(packet, callback); + await client.handleMessage(packet, undefined, callback); expect( callback.calledWith({ err: undefined, From b29299bc16d1f31c05388aa6be63a2a88e698ac6 Mon Sep 17 00:00:00 2001 From: bangbang93 Date: Wed, 27 Jul 2022 11:41:25 +0800 Subject: [PATCH 2/3] fix(microservice): pass options to rmq deserialize --- packages/microservices/client/client-rmq.ts | 14 ++++++++++++-- .../microservices/test/client/client-rmq.spec.ts | 2 +- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index c412c2d2c43..5c6616f7f30 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,6 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; +import { isFunction } from '@nestjs/common/utils/shared.utils'; import { EventEmitter } from 'events'; import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs'; import { first, map, retryWhen, scan, share, switchMap } from 'rxjs/operators'; @@ -184,13 +185,22 @@ export class ClientRMQ extends ClientProxy { }); } + public async handleMessage( + packet: unknown, + callback: (packet: WritePacket) => any, + ); public async handleMessage( packet: unknown, options: Record, callback: (packet: WritePacket) => any, + ); + public async handleMessage( + packet: unknown, + options: Record | ((packet: WritePacket) => any), + callback?: (packet: WritePacket) => any, ) { - if (typeof options === 'function') { - callback = options; + if (isFunction(options)) { + callback = options as (packet: WritePacket) => any; options = undefined; } diff --git a/packages/microservices/test/client/client-rmq.spec.ts b/packages/microservices/test/client/client-rmq.spec.ts index 73622fa9ba2..d21a635a418 100644 --- a/packages/microservices/test/client/client-rmq.spec.ts +++ b/packages/microservices/test/client/client-rmq.spec.ts @@ -290,7 +290,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: false, }; - await client.handleMessage(packet, undefined, callback); + await client.handleMessage(packet, callback); expect( callback.calledWith({ err: packet.err, From 0ec9cb7261d4bec693643aa1ba95c85db4506091 Mon Sep 17 00:00:00 2001 From: bangbang93 Date: Wed, 27 Jul 2022 11:54:48 +0800 Subject: [PATCH 3/3] test(microservice): revert change on tests --- packages/microservices/test/client/client-rmq.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/microservices/test/client/client-rmq.spec.ts b/packages/microservices/test/client/client-rmq.spec.ts index d21a635a418..2c764b57f79 100644 --- a/packages/microservices/test/client/client-rmq.spec.ts +++ b/packages/microservices/test/client/client-rmq.spec.ts @@ -311,7 +311,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: true, }; - await client.handleMessage(packet, undefined, callback); + await client.handleMessage(packet, callback); expect( callback.calledWith({ err: undefined, @@ -333,7 +333,7 @@ describe('ClientRMQ', function () { response: 'test', isDisposed: false, }; - await client.handleMessage(packet, undefined, callback); + await client.handleMessage(packet, callback); expect( callback.calledWith({ err: undefined,