Skip to content

Commit

Permalink
Merge pull request #10272 from Papooch/feat/kafka-producer-in-context
Browse files Browse the repository at this point in the history
feat(microservices): Add producer reference to KafkaContext
  • Loading branch information
kamilmysliwiec committed Sep 19, 2022
2 parents 7d4f7e2 + e98d337 commit cbd43d7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
10 changes: 9 additions & 1 deletion packages/microservices/ctx-host/kafka.context.ts
@@ -1,4 +1,4 @@
import { Consumer, KafkaMessage } from '../external/kafka.interface';
import { Consumer, KafkaMessage, Producer } from '../external/kafka.interface';
import { BaseRpcContext } from './base-rpc.context';

type KafkaContextArgs = [
Expand All @@ -7,6 +7,7 @@ type KafkaContextArgs = [
topic: string,
consumer: Consumer,
heartbeat: () => Promise<void>,
producer: Producer,
];

export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
Expand Down Expand Up @@ -48,4 +49,11 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
getHeartbeat() {
return this.args[4];
}

/**
* Returns the Kafka producer reference,
*/
getProducer() {
return this.args[5];
}
}
1 change: 1 addition & 0 deletions packages/microservices/server/server-kafka.ts
Expand Up @@ -168,6 +168,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
payload.topic,
this.consumer,
payload.heartbeat,
this.producer,
]);
const handler = this.getHandlerByPattern(packet.pattern);
// if the correlation id or reply topic is not set
Expand Down
21 changes: 19 additions & 2 deletions packages/microservices/test/ctx-host/kafka.context.spec.ts
@@ -1,6 +1,10 @@
import { expect } from 'chai';
import { KafkaContext } from '../../ctx-host';
import { Consumer, KafkaMessage } from '../../external/kafka.interface';
import {
Consumer,
KafkaMessage,
Producer,
} from '../../external/kafka.interface';

describe('KafkaContext', () => {
const args = [
Expand All @@ -9,12 +13,20 @@ describe('KafkaContext', () => {
undefined,
{ test: 'consumer' },
() => {},
{ test: 'producer' },
];
let context: KafkaContext;

beforeEach(() => {
context = new KafkaContext(
args as [KafkaMessage, number, string, Consumer, () => Promise<void>],
args as [
KafkaMessage,
number,
string,
Consumer,
() => Promise<void>,
Producer,
],
);
});
describe('getTopic', () => {
Expand Down Expand Up @@ -42,4 +54,9 @@ describe('KafkaContext', () => {
expect(context.getHeartbeat()).to.be.eql(args[4]);
});
});
describe('getProducer', () => {
it('should return producer instance', () => {
expect(context.getProducer()).to.deep.eq({ test: 'producer' });
});
});
});

0 comments on commit cbd43d7

Please sign in to comment.