Skip to content

Commit

Permalink
feat(microservices): add noAssert option for RMQ connection
Browse files Browse the repository at this point in the history
Rabbit MQ brokers may not allow for queue declaration, so a check is needed to avoid 403 errors in that scenario
  • Loading branch information
agrofrank committed Jun 18, 2022
1 parent 83098e2 commit bb7e9d7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 2 deletions.
9 changes: 8 additions & 1 deletion packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqUrl } from '../external/rmq-url.interface';
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
Expand All @@ -38,6 +39,7 @@ export class ClientRMQ extends ClientProxy {
protected responseEmitter: EventEmitter;
protected replyQueue: string;
protected persistent: boolean;
protected noAssert: boolean;

constructor(protected readonly options: RmqOptions['options']) {
super();
Expand All @@ -51,6 +53,9 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'replyQueue') || REPLY_QUEUE;
this.persistent =
this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rqmPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
require('amqp-connection-manager'),
Expand Down Expand Up @@ -143,7 +148,9 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

await channel.assertQueue(this.queue, this.queueOptions);
if (!this.queueOptions.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);

this.responseEmitter = new EventEmitter();
Expand Down
1 change: 1 addition & 0 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};
export const RQM_DEFAULT_NOACK = true;
export const RQM_DEFAULT_PERSISTENT = false;
export const RQM_DEFAULT_NO_ASSERT = false;
export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';

export const NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export interface RmqOptions {
replyQueue?: string;
persistent?: boolean;
headers?: Record<string, string>;
noAssert?: boolean;
};
}

Expand Down
8 changes: 7 additions & 1 deletion packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_DEFAULT_NO_ASSERT,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
Expand All @@ -40,6 +41,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
protected readonly prefetchCount: number;
protected readonly queueOptions: any;
protected readonly isGlobalPrefetchCount: boolean;
protected readonly noAssert: boolean;

constructor(protected readonly options: RmqOptions['options']) {
super();
Expand All @@ -55,6 +57,8 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;

this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rqmPackage = this.loadPackage(
Expand Down Expand Up @@ -115,7 +119,9 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
public async setupChannel(channel: any, callback: Function) {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);

await channel.assertQueue(this.queue, this.queueOptions);
if (!this.queueOptions.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
channel.consume(
this.queue,
Expand Down

0 comments on commit bb7e9d7

Please sign in to comment.