diff --git a/packages/microservices/ctx-host/kafka.context.ts b/packages/microservices/ctx-host/kafka.context.ts index 7d1f5e4a5cc..4ba1663c85e 100644 --- a/packages/microservices/ctx-host/kafka.context.ts +++ b/packages/microservices/ctx-host/kafka.context.ts @@ -1,4 +1,4 @@ -import { Consumer, KafkaMessage } from '../external/kafka.interface'; +import { Consumer, KafkaMessage, Producer } from '../external/kafka.interface'; import { BaseRpcContext } from './base-rpc.context'; type KafkaContextArgs = [ @@ -7,6 +7,7 @@ type KafkaContextArgs = [ topic: string, consumer: Consumer, heartbeat: () => Promise, + producer: Producer, ]; export class KafkaContext extends BaseRpcContext { @@ -48,4 +49,11 @@ export class KafkaContext extends BaseRpcContext { getHeartbeat() { return this.args[4]; } + + /** + * Returns the Kafka producer reference, + */ + getProducer() { + return this.args[5]; + } } diff --git a/packages/microservices/server/server-kafka.ts b/packages/microservices/server/server-kafka.ts index e7bfee7cee4..7ac52f9faee 100644 --- a/packages/microservices/server/server-kafka.ts +++ b/packages/microservices/server/server-kafka.ts @@ -168,6 +168,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy { payload.topic, this.consumer, payload.heartbeat, + this.producer, ]); const handler = this.getHandlerByPattern(packet.pattern); // if the correlation id or reply topic is not set diff --git a/packages/microservices/test/ctx-host/kafka.context.spec.ts b/packages/microservices/test/ctx-host/kafka.context.spec.ts index 9be85a12be6..11ba7345611 100644 --- a/packages/microservices/test/ctx-host/kafka.context.spec.ts +++ b/packages/microservices/test/ctx-host/kafka.context.spec.ts @@ -1,6 +1,10 @@ import { expect } from 'chai'; import { KafkaContext } from '../../ctx-host'; -import { Consumer, KafkaMessage } from '../../external/kafka.interface'; +import { + Consumer, + KafkaMessage, + Producer, +} from '../../external/kafka.interface'; describe('KafkaContext', () => { const args = [ @@ -9,12 +13,20 @@ describe('KafkaContext', () => { undefined, { test: 'consumer' }, () => {}, + { test: 'producer' }, ]; let context: KafkaContext; beforeEach(() => { context = new KafkaContext( - args as [KafkaMessage, number, string, Consumer, () => Promise], + args as [ + KafkaMessage, + number, + string, + Consumer, + () => Promise, + Producer, + ], ); }); describe('getTopic', () => { @@ -42,4 +54,9 @@ describe('KafkaContext', () => { expect(context.getHeartbeat()).to.be.eql(args[4]); }); }); + describe('getProducer', () => { + it('should return producer instance', () => { + expect(context.getProducer()).to.deep.eq({ test: 'producer' }); + }); + }); });