Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headers (outbound and inbound) do not comply with KafkaJS type definitions #45

Closed
apeloquin-agilysys opened this issue May 16, 2024 · 3 comments
Labels
bug Something isn't working fixed-present-in-next-release Bug or improvement that's done, it is in the development branch but yet unreleased

Comments

@apeloquin-agilysys
Copy link

apeloquin-agilysys commented May 16, 2024

The header implementation for both producers and consumers does not comply with the type definitions offered up in kafkajs.d.ts (which are unmodified from the KafkaJS originals).

Below is a comparison between KafkaJS and Confluent.

KafkaJS

import {Kafka} from "kafkajs";

const topic = "test-kafkajs-topic";
let receivedCount = 0;

const kafka = new Kafka({brokers: ["localhost:9092"]});

const consumer = kafka.consumer({groupId:`${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic: TOPIC});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

const producer = kafka.producer();
await producer.connect();
await producer.send({
  topic: TOPIC,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "header1": {
    "type": "Buffer",
    "data": [
      97,
      108,
      112,
      104,
      97
    ]
  },
  "header2": {
    "type": "Buffer",
    "data": [
      98,
      101,
      116,
      97
    ]
  }
}

Confluent

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

const topic = "test-confluent-topic";
let receivedCount = 0;

const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});

const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
  eachMessage: async ({message}) => {
    log.info(JSON.stringify(message.headers, null, 2));
    receivedCount++;
  }
});

await until(async () => consumer.assignment().length > 0);

const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
await producer.send({
  topic,
  messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});

await until(async () => receivedCount == 1);

await producer.disconnect();
await consumer.disconnect();
{
  "0": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        49
      ]
    }
  },
  "1": {
    "key": {
      "type": "Buffer",
      "data": [
        104,
        101,
        97,
        100,
        101,
        114,
        50
      ]
    }
  }
}

Two (maybe three) notable issues:

  1. The headers header1=alpha and header2=beta were sent to Kafka as key=header1 and key=header2
  2. When that message was received, the headers object does not match the IHeaders type definition:
export interface IHeaders {
  [key: string]: Buffer | string | (Buffer | string)[] | undefined
}
  1. if I had actually sent key=header1 and key=header2, KafkaJS compatibility would dictate a string key of "key" and a string[] value of ["header1","header2"]
@apeloquin-agilysys
Copy link
Author

I can workaround this on the consumer side:

const result: IHeaders = {};
for (const header of Object.values(headers)) {
  for (const [key, value] of Object.entries(header as unknown as IHeaders)) {
    if (!value) continue;
    const existing = result[key];
    result[key] = existing ? isArray(existing) ? [...existing, value] : [existing, value] : value;
  }
}
return result;

However, I'm unable to determine a headers format for the producer side that results in correctly delivered headers.

@milindl
Copy link
Contributor

milindl commented May 17, 2024

Thanks for filing this. There were a bunch of issues with conversions for the headers within JS and within C++ also. I've fixed those on the development branch. These are OK to use immediately and will match the IHeaders interface.

Additionally, there is a PR #39 improving the typing support.

I've added it within the examples/typescript/kafkajs.ts too for reference.

headers: {
  'header1': 'value1', 
  'header2': [Buffer.from('value2'), 'value3']
}

@milindl milindl added bug Something isn't working fixed-present-in-next-release Bug or improvement that's done, it is in the development branch but yet unreleased labels May 17, 2024
@milindl
Copy link
Contributor

milindl commented May 20, 2024

Closing this as I released 0.1.15-devel with the fixes, given example works too

@milindl milindl closed this as completed May 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working fixed-present-in-next-release Bug or improvement that's done, it is in the development branch but yet unreleased
Projects
None yet
Development

No branches or pull requests

2 participants