diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index a8b1e64fd9f..084788a7860 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -16,6 +16,7 @@ import { RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS, RQM_DEFAULT_URL, + RQM_DEFAULT_NO_ASSERT, } from '../constants'; import { RmqUrl } from '../external/rmq-url.interface'; import { ReadPacket, RmqOptions, WritePacket } from '../interfaces'; @@ -38,6 +39,7 @@ export class ClientRMQ extends ClientProxy { protected responseEmitter: EventEmitter; protected replyQueue: string; protected persistent: boolean; + protected noAssert: boolean; constructor(protected readonly options: RmqOptions['options']) { super(); @@ -51,6 +53,9 @@ export class ClientRMQ extends ClientProxy { this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE; this.persistent = this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT; + this.noAssert = + this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT; + loadPackage('amqplib', ClientRMQ.name, () => require('amqplib')); rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () => require('amqp-connection-manager'), @@ -143,7 +148,9 @@ export class ClientRMQ extends ClientProxy { this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; - await channel.assertQueue(this.queue, this.queueOptions); + if (!this.queueOptions.noAssert) { + await channel.assertQueue(this.queue, this.queueOptions); + } await channel.prefetch(prefetchCount, isGlobalPrefetchCount); this.responseEmitter = new EventEmitter(); diff --git a/packages/microservices/constants.ts b/packages/microservices/constants.ts index 2ae25680580..0d5868f5b51 100644 --- a/packages/microservices/constants.ts +++ b/packages/microservices/constants.ts @@ -39,6 +39,7 @@ export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false; export const RQM_DEFAULT_QUEUE_OPTIONS = {}; export const RQM_DEFAULT_NOACK = true; export const RQM_DEFAULT_PERSISTENT = false; +export const RQM_DEFAULT_NO_ASSERT = false; export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader'; export const NO_EVENT_HANDLER = (text: TemplateStringsArray, pattern: string) => diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index f167a414ce7..38abcd05d1f 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -181,6 +181,7 @@ export interface RmqOptions { replyQueue?: string; persistent?: boolean; headers?: Record; + noAssert?: boolean; }; } diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 18ca16eb96d..e38e3519fd0 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -16,6 +16,7 @@ import { RQM_DEFAULT_QUEUE, RQM_DEFAULT_QUEUE_OPTIONS, RQM_DEFAULT_URL, + RQM_DEFAULT_NO_ASSERT, } from '../constants'; import { RmqContext } from '../ctx-host'; import { Transport } from '../enums'; @@ -40,6 +41,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { protected readonly prefetchCount: number; protected readonly queueOptions: any; protected readonly isGlobalPrefetchCount: boolean; + protected readonly noAssert: boolean; constructor(protected readonly options: RmqOptions['options']) { super(); @@ -55,6 +57,8 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { this.queueOptions = this.getOptionsProp(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS; + this.noAssert = + this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT; this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib')); rqmPackage = this.loadPackage( @@ -115,7 +119,9 @@ export class ServerRMQ extends Server implements CustomTransportStrategy { public async setupChannel(channel: any, callback: Function) { const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); - await channel.assertQueue(this.queue, this.queueOptions); + if (!this.queueOptions.noAssert) { + await channel.assertQueue(this.queue, this.queueOptions); + } await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount); channel.consume( this.queue,