From e98d3372bedd06c500bc9c3899bb6e9662bacb33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20=C5=A0vanda?= <46406259+Papooch@users.noreply.github.com> Date: Mon, 12 Sep 2022 10:10:28 +0200 Subject: [PATCH] feat(microservices): Add producer reference to KafkaContext --- .../microservices/ctx-host/kafka.context.ts | 10 ++++++++- packages/microservices/server/server-kafka.ts | 1 + .../test/ctx-host/kafka.context.spec.ts | 21 +++++++++++++++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/packages/microservices/ctx-host/kafka.context.ts b/packages/microservices/ctx-host/kafka.context.ts index 7d1f5e4a5cc..4ba1663c85e 100644 --- a/packages/microservices/ctx-host/kafka.context.ts +++ b/packages/microservices/ctx-host/kafka.context.ts @@ -1,4 +1,4 @@ -import { Consumer, KafkaMessage } from '../external/kafka.interface'; +import { Consumer, KafkaMessage, Producer } from '../external/kafka.interface'; import { BaseRpcContext } from './base-rpc.context'; type KafkaContextArgs = [ @@ -7,6 +7,7 @@ type KafkaContextArgs = [ topic: string, consumer: Consumer, heartbeat: () => Promise, + producer: Producer, ]; export class KafkaContext extends BaseRpcContext { @@ -48,4 +49,11 @@ export class KafkaContext extends BaseRpcContext { getHeartbeat() { return this.args[4]; } + + /** + * Returns the Kafka producer reference, + */ + getProducer() { + return this.args[5]; + } } diff --git a/packages/microservices/server/server-kafka.ts b/packages/microservices/server/server-kafka.ts index e7bfee7cee4..7ac52f9faee 100644 --- a/packages/microservices/server/server-kafka.ts +++ b/packages/microservices/server/server-kafka.ts @@ -168,6 +168,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy { payload.topic, this.consumer, payload.heartbeat, + this.producer, ]); const handler = this.getHandlerByPattern(packet.pattern); // if the correlation id or reply topic is not set diff --git a/packages/microservices/test/ctx-host/kafka.context.spec.ts b/packages/microservices/test/ctx-host/kafka.context.spec.ts index 9be85a12be6..11ba7345611 100644 --- a/packages/microservices/test/ctx-host/kafka.context.spec.ts +++ b/packages/microservices/test/ctx-host/kafka.context.spec.ts @@ -1,6 +1,10 @@ import { expect } from 'chai'; import { KafkaContext } from '../../ctx-host'; -import { Consumer, KafkaMessage } from '../../external/kafka.interface'; +import { + Consumer, + KafkaMessage, + Producer, +} from '../../external/kafka.interface'; describe('KafkaContext', () => { const args = [ @@ -9,12 +13,20 @@ describe('KafkaContext', () => { undefined, { test: 'consumer' }, () => {}, + { test: 'producer' }, ]; let context: KafkaContext; beforeEach(() => { context = new KafkaContext( - args as [KafkaMessage, number, string, Consumer, () => Promise], + args as [ + KafkaMessage, + number, + string, + Consumer, + () => Promise, + Producer, + ], ); }); describe('getTopic', () => { @@ -42,4 +54,9 @@ describe('KafkaContext', () => { expect(context.getHeartbeat()).to.be.eql(args[4]); }); }); + describe('getProducer', () => { + it('should return producer instance', () => { + expect(context.getProducer()).to.deep.eq({ test: 'producer' }); + }); + }); });