Skip to content

Commit

Permalink
Merge pull request #13285 from sorooshme/fix-microservice-rmq-noasser…
Browse files Browse the repository at this point in the history
…t-flag

fix(microservices): fix rabbitmq no-assert not being applied correctly
  • Loading branch information
kamilmysliwiec committed Mar 17, 2024
2 parents 57bae20 + ed6d91b commit 07fb4d5
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
6 changes: 4 additions & 2 deletions packages/microservices/client/client-rmq.ts
Expand Up @@ -78,7 +78,9 @@ export class ClientRMQ extends ClientProxy {
this.persistent =
this.getOptionsProp(this.options, 'persistent') || RQM_DEFAULT_PERSISTENT;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;
this.getOptionsProp(this.options, 'noAssert') ??
this.queueOptions.noAssert ??
RQM_DEFAULT_NO_ASSERT;

loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
Expand Down Expand Up @@ -187,7 +189,7 @@ export class ClientRMQ extends ClientProxy {
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

if (!this.queueOptions.noAssert) {
if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
Expand Down
6 changes: 4 additions & 2 deletions packages/microservices/server/server-rmq.ts
Expand Up @@ -65,7 +65,9 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') || RQM_DEFAULT_NO_ASSERT;
this.getOptionsProp(this.options, 'noAssert') ??
this.queueOptions.noAssert ??
RQM_DEFAULT_NO_ASSERT;

this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rmqPackage = this.loadPackage(
Expand Down Expand Up @@ -145,7 +147,7 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}

public async setupChannel(channel: any, callback: Function) {
if (!this.queueOptions.noAssert) {
if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}
await channel.prefetch(this.prefetchCount, this.isGlobalPrefetchCount);
Expand Down
23 changes: 22 additions & 1 deletion packages/microservices/test/client/client-rmq.spec.ts
Expand Up @@ -11,6 +11,19 @@ describe('ClientRMQ', function () {

let client: ClientRMQ;

describe('constructor', () => {
it(`should fallback to queueOptions.noAssert when 'noAssert' is undefined`, () => {
const queueOptions = {
noAssert: true,
};
const instance = new ClientRMQ({
queueOptions,
});

expect(instance).property('noAssert').to.eq(queueOptions.noAssert);
});
});

describe('connect', () => {
let createClientStub: sinon.SinonStub;
let handleErrorsSpy: sinon.SinonSpy;
Expand Down Expand Up @@ -135,10 +148,18 @@ describe('ClientRMQ', function () {
afterEach(() => {
consumeStub.restore();
});
it('should call "assertQueue" with queue and queue options', async () => {
it('should call "assertQueue" with queue and queue options when noAssert is false', async () => {
client['noAssert'] = false;

await client.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should not call "assertQueue" when noAssert is true', async () => {
client['noAssert'] = true;

await client.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await client.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
Expand Down
24 changes: 23 additions & 1 deletion packages/microservices/test/server/server-rmq.spec.ts
Expand Up @@ -14,6 +14,20 @@ describe('ServerRMQ', () => {
beforeEach(() => {
server = new ServerRMQ({});
});

describe('constructor', () => {
it(`should fallback to queueOptions.noAssert when 'noAssert' is undefined`, () => {
const queueOptions = {
noAssert: true,
};
const instance = new ServerRMQ({
queueOptions,
});

expect(instance).property('noAssert').to.eq(queueOptions.noAssert);
});
});

describe('listen', () => {
let createClient: sinon.SinonStub;
let onStub: sinon.SinonStub;
Expand Down Expand Up @@ -196,10 +210,18 @@ describe('ServerRMQ', () => {
consume: sinon.spy(),
};
});
it('should call "assertQueue" with queue and queue options', async () => {
it('should call "assertQueue" with queue and queue options when noAssert is false', async () => {
server['noAssert' as any] = false;

await server.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should not call "assertQueue" when noAssert is true', async () => {
server['noAssert' as any] = true;

await server.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await server.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
Expand Down

0 comments on commit 07fb4d5

Please sign in to comment.