-
Notifications
You must be signed in to change notification settings - Fork 20
Closed
Labels
bugSomething isn't workingSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased
Description
The header implementation for both producers and consumers does not comply with the type definitions offered up in kafkajs.d.ts (which are unmodified from the KafkaJS originals).
Below is a comparison between KafkaJS and Confluent.
KafkaJS
import {Kafka} from "kafkajs";
const topic = "test-kafkajs-topic";
let receivedCount = 0;
const kafka = new Kafka({brokers: ["localhost:9092"]});
const consumer = kafka.consumer({groupId:`${topic}-group`});
await consumer.connect();
await consumer.subscribe({topic: TOPIC});
await consumer.run({
eachMessage: async ({message}) => {
log.info(JSON.stringify(message.headers, null, 2));
receivedCount++;
}
});
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: TOPIC,
messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});
await until(async () => receivedCount == 1);
await producer.disconnect();
await consumer.disconnect();
{
"header1": {
"type": "Buffer",
"data": [
97,
108,
112,
104,
97
]
},
"header2": {
"type": "Buffer",
"data": [
98,
101,
116,
97
]
}
}
Confluent
import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
const topic = "test-confluent-topic";
let receivedCount = 0;
const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
await consumer.connect();
await consumer.subscribe({topic});
await consumer.run({
eachMessage: async ({message}) => {
log.info(JSON.stringify(message.headers, null, 2));
receivedCount++;
}
});
await until(async () => consumer.assignment().length > 0);
const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
await producer.send({
topic,
messages: [{value: "one", headers: {header1: "alpha", header2: "beta"}}]
});
await until(async () => receivedCount == 1);
await producer.disconnect();
await consumer.disconnect();
{
"0": {
"key": {
"type": "Buffer",
"data": [
104,
101,
97,
100,
101,
114,
49
]
}
},
"1": {
"key": {
"type": "Buffer",
"data": [
104,
101,
97,
100,
101,
114,
50
]
}
}
}
Two (maybe three) notable issues:
- The headers
header1=alpha
andheader2=beta
were sent to Kafka askey=header1
andkey=header2
- When that message was received, the headers object does not match the IHeaders type definition:
export interface IHeaders {
[key: string]: Buffer | string | (Buffer | string)[] | undefined
}
- if I had actually sent
key=header1
andkey=header2
, KafkaJS compatibility would dictate a string key of"key"
and a string[] value of["header1","header2"]
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased