From f83e830a4bcc1cea577ad0aeb6dac9ec8902f71f Mon Sep 17 00:00:00 2001 From: Jozef Dzama Date: Thu, 21 Jul 2022 07:29:21 +0200 Subject: [PATCH] fix(microservices): kafka parser stops modifying received message --- .../microservices/helpers/kafka-parser.ts | 16 ++++++---- .../test/client/client-kafka.spec.ts | 4 +-- .../test/helpers/kafka-parser.spec.ts | 30 +++++++++++++++++++ 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/packages/microservices/helpers/kafka-parser.ts b/packages/microservices/helpers/kafka-parser.ts index 781217ff8d4..03064737041 100644 --- a/packages/microservices/helpers/kafka-parser.ts +++ b/packages/microservices/helpers/kafka-parser.ts @@ -9,22 +9,28 @@ export class KafkaParser { } public parse(data: any): T { + // Duplicate the object to not modify the original one (would break KafkaJS retries) + const result = { + ...data, + headers: { ...data.headers }, + }; + if (!this.keepBinary) { - data.value = this.decode(data.value); + result.value = this.decode(data.value); } if (!isNil(data.key)) { - data.key = this.decode(data.key); + result.key = this.decode(data.key); } if (!isNil(data.headers)) { const decodeHeaderByKey = (key: string) => { - data.headers[key] = this.decode(data.headers[key]); + result.headers[key] = this.decode(data.headers[key]); }; Object.keys(data.headers).forEach(decodeHeaderByKey); } else { - data.headers = {}; + result.headers = {}; } - return data; + return result; } public decode(value: Buffer): object | string | null | Buffer { diff --git a/packages/microservices/test/client/client-kafka.spec.ts b/packages/microservices/test/client/client-kafka.spec.ts index 3770baea994..39743da529a 100644 --- a/packages/microservices/test/client/client-kafka.spec.ts +++ b/packages/microservices/test/client/client-kafka.spec.ts @@ -76,7 +76,7 @@ describe('ClientKafka', () => { message, { size: 0, - value: { test: true }, + value: Buffer.from(JSON.stringify({ test: true })), }, ), heartbeat, @@ -488,7 +488,7 @@ describe('ClientKafka', () => { expect( callback.calledWith({ isDisposed: true, - response: payloadDisposed.message.value, + response: deserializedPayloadDisposed.message.value, err: undefined, }), ).to.be.true; diff --git a/packages/microservices/test/helpers/kafka-parser.spec.ts b/packages/microservices/test/helpers/kafka-parser.spec.ts index 91c698d1bb0..e4f8babcdde 100644 --- a/packages/microservices/test/helpers/kafka-parser.spec.ts +++ b/packages/microservices/test/helpers/kafka-parser.spec.ts @@ -126,5 +126,35 @@ describe('KafkaParser', () => { }, }); }); + + it('parse message multiple times (simulate retry)', () => { + const message = { + headers: { + [KafkaHeaders.CORRELATION_ID]: Buffer.from('correlation-id'), + }, + value: Buffer.from(JSON.stringify({ prop: 'value' })), + key: Buffer.from('1'), + }; + const expectedParsedMessage = { + key: '1', + value: { + prop: 'value', + }, + headers: { + [KafkaHeaders.CORRELATION_ID]: 'correlation-id', + }, + }; + expect(kafkaParser.parse(message)).to.deep.eq(expectedParsedMessage); + // Parse message again and verify it still works correctly + expect(kafkaParser.parse(message)).to.deep.eq(expectedParsedMessage); + // Verify message was not modified + expect(message).to.deep.eq({ + headers: { + [KafkaHeaders.CORRELATION_ID]: Buffer.from('correlation-id'), + }, + value: Buffer.from(JSON.stringify({ prop: 'value' })), + key: Buffer.from('1'), + }); + }); }); });