Skip to content

Commit

Permalink
Merge pull request #9798 from frankmangone/frankmangone/feature/avoid…
Browse files Browse the repository at this point in the history
…-queue-declaration-rmq

feat(microservices): add noAssert option for RMQ connection
  • Loading branch information
kamilmysliwiec committed Jul 20, 2022
2 parents 429dfa1 + bb7e9d7 commit 5663012
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 2 deletions.
9 changes: 8 additions & 1 deletion packages/microservices/client/client-rmq.ts
Expand Up @@ -16,6 +16,7 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqUrl } from '../external/rmq-url.interface';
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
Expand All @@ -38,6 +39,7 @@ export class ClientRMQ extends ClientProxy {
protected responseEmitter: EventEmitter;
protected replyQueue: string;
protected persistent: boolean;
protected noAssert: boolean;

constructor(protected readonly options: RmqOptions['options']) {
super();
Expand All @@ -51,6 +53,9 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
this.persistent =
this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
require('amqp-connection-manager'),
Expand Down Expand Up @@ -143,7 +148,9 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

await channel.assertQueue(this.queue, this.queueOptions);
if (!this.queueOptions.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);

this.responseEmitter = new EventEmitter();
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/constants.ts
Expand Up @@ -39,6 +39,7 @@ export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};
export const RQM_DEFAULT_NOACK = true;
export const RQM_DEFAULT_PERSISTENT = false;
export const RQM_DEFAULT_NO_ASSERT = false;
export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';

export const NO_EVENT_HANDLER = (text: TemplateStringsArray, pattern: string) =>
Expand Down
Expand Up @@ -181,6 +181,7 @@ export interface RmqOptions {
replyQueue?: string;
persistent?: boolean;
headers?: Record<string, string>;
noAssert?: boolean;
};
}

Expand Down
8 changes: 7 additions & 1 deletion packages/microservices/server/server-rmq.ts
Expand Up @@ -16,6 +16,7 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
Expand All @@ -40,6 +41,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
protected readonly prefetchCount: number;
protected readonly queueOptions: any;
protected readonly isGlobalPrefetchCount: boolean;
protected readonly noAssert: boolean;

constructor(protected readonly options: RmqOptions['options']) {
super();
Expand All @@ -55,6 +57,8 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rqmPackage = this.loadPackage(
Expand Down Expand Up @@ -115,7 +119,9 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
public async setupChannel(channel: any, callback: Function) {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);

await channel.assertQueue(this.queue, this.queueOptions);
if (!this.queueOptions.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(
this.queue,
Expand Down

0 comments on commit 5663012

Please sign in to comment.