Skip to content

Commit 35e893c

Browse files
committed
Fix issues with header conversions
1 parent 3f9f7fa commit 35e893c

File tree

8 files changed

+126
-23
lines changed

8 files changed

+126
-23
lines changed

e2e/both.spec.js

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,12 +424,30 @@ describe('Consumer/Producer', function() {
424424
var headers = [
425425
{ key1: 'value1' },
426426
{ key2: Buffer.from('value2') },
427-
{ key3: 100 },
428-
{ key4: 10.1 },
429427
];
430428
run_headers_test(done, headers);
431429
});
432430

431+
it('should not be able to produce any non-string and non-buffer headers: consumeLoop', function(done) {
432+
producer.setPollInterval(10);
433+
434+
const headerCases = [
435+
[ { key: 10 } ],
436+
[ { key: null }],
437+
[ { key: undefined }],
438+
];
439+
for (const headerCase of headerCases) {
440+
const buffer = Buffer.from('value');
441+
const key = 'key';
442+
t.throws(
443+
() => producer.produce(topic, null, buffer, key, null, "", headerCase),
444+
'must be string or buffer'
445+
);
446+
}
447+
448+
done();
449+
});
450+
433451
it('should be able to produce and consume messages: empty buffer key and empty value', function(done) {
434452
var emptyString = '';
435453
var key = Buffer.from(emptyString);

examples/consumer.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ async function consumerStart() {
7575
console.log({
7676
topic,
7777
partition,
78+
headers: message.headers,
7879
offset: message.offset,
7980
key: message.key?.toString(),
8081
value: message.value.toString(),

examples/producer.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,19 @@ async function producerStart() {
1414
res.push(producer.send({
1515
topic: 'test-topic',
1616
messages: [
17-
{ value: 'v222', partition: 1 },
18-
{ value: 'v11', partition: 0, key: 'x' },
17+
{
18+
value: 'v1',
19+
partition: 0,
20+
key: 'x',
21+
headers: {
22+
'header1': ['h1v1', 'h1v2'],
23+
'header3': 'h3v3',
24+
}
25+
},
26+
{
27+
value: 'v2',
28+
key: 'y',
29+
}
1930
]
2031
}));
2132
}

examples/typescript/kafkajs.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@ async function runProducer() {
2222
await producer.send({
2323
topic: 'test-topic',
2424
messages: [
25-
{ value: 'Hello World!', key: 'key1' },
25+
{
26+
value: 'Hello World!',
27+
key: 'key1',
28+
headers: {
29+
'header1': 'value1',
30+
'header2': [Buffer.from('value2'), 'value3']
31+
}
32+
},
2633
],
2734
});
2835

@@ -52,6 +59,7 @@ async function runConsumer() {
5259
console.log({
5360
key: message.key ? message.key.toString() : null,
5461
value: message.value ? message.value.toString() : null,
62+
headers: message.headers,
5563
});
5664
},
5765
});

lib/kafkajs/_common.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,16 @@ function convertToRdKafkaHeaders(kafkaJSHeaders) {
559559

560560
const headers = [];
561561
for (const [key, value] of Object.entries(kafkaJSHeaders)) {
562-
if (value.constructor === Array) {
562+
if (value && value.constructor === Array) {
563563
for (const v of value) {
564-
headers.push({ key, value: v });
564+
const header = {};
565+
header[key] = v;
566+
headers.push(header);
565567
}
566568
} else {
567-
headers.push({ key, value });
569+
const header = {};
570+
header[key] = value;
571+
headers.push(header);
568572
}
569573
}
570574
return headers;

lib/kafkajs/_consumer.js

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -608,13 +608,15 @@ class Consumer {
608608
let headers;
609609
if (message.headers) {
610610
headers = {}
611-
for (const [key, value] of Object.entries(message.headers)) {
612-
if (!Object.hasOwn(headers, key)) {
613-
headers[key] = value;
614-
} else if (headers[key].constructor === Array) {
615-
headers[key].push(value);
616-
} else {
617-
headers[key] = [headers[key], value];
611+
for (const header of message.headers) {
612+
for (const [key, value] of Object.entries(header)) {
613+
if (!Object.hasOwn(headers, key)) {
614+
headers[key] = value;
615+
} else if (headers[key].constructor === Array) {
616+
headers[key].push(value);
617+
} else {
618+
headers[key] = [headers[key], value];
619+
}
618620
}
619621
}
620622
}

src/producer.cc

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -595,18 +595,37 @@ NAN_METHOD(Producer::NodeProduce) {
595595

596596
v8::Local<v8::Array> props = header->GetOwnPropertyNames(
597597
Nan::GetCurrentContext()).ToLocalChecked();
598-
Nan::MaybeLocal<v8::String> v8Key = Nan::To<v8::String>(
599-
Nan::Get(props, 0).ToLocalChecked());
600-
Nan::MaybeLocal<v8::String> v8Value = Nan::To<v8::String>(
601-
Nan::Get(header, v8Key.ToLocalChecked()).ToLocalChecked());
602598

599+
// TODO: Other properties in the list of properties should not be
600+
// ignored, but they are. This is a bug, need to handle it either in JS
601+
// or here.
602+
Nan::MaybeLocal<v8::String> v8Key =
603+
Nan::To<v8::String>(Nan::Get(props, 0).ToLocalChecked());
604+
605+
// The key must be a string.
606+
if (v8Key.IsEmpty()) {
607+
Nan::ThrowError("Header key must be a string");
608+
}
603609
Nan::Utf8String uKey(v8Key.ToLocalChecked());
604610
std::string key(*uKey);
605611

606-
Nan::Utf8String uValue(v8Value.ToLocalChecked());
607-
std::string value(*uValue);
608-
headers.push_back(
609-
RdKafka::Headers::Header(key, value.c_str(), value.size()));
612+
// Valid types for the header are string or buffer.
613+
// Other types will throw an error.
614+
v8::Local<v8::Value> v8Value =
615+
Nan::Get(header, v8Key.ToLocalChecked()).ToLocalChecked();
616+
617+
if (node::Buffer::HasInstance(v8Value)) {
618+
const char* value = node::Buffer::Data(v8Value);
619+
const size_t value_len = node::Buffer::Length(v8Value);
620+
headers.push_back(RdKafka::Headers::Header(key, value, value_len));
621+
} else if (v8Value->IsString()) {
622+
Nan::Utf8String uValue(v8Value);
623+
std::string value(*uValue);
624+
headers.push_back(
625+
RdKafka::Headers::Header(key, value.c_str(), value.size()));
626+
} else {
627+
Nan::ThrowError("Header value must be a string or buffer");
628+
}
610629
}
611630
}
612631
}

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,46 @@ describe.each([[true], [false]])('Consumer', (isAutoCommit) => {
8585
expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`))
8686
});
8787

88+
it('consume messages with headers', async () => {
89+
await consumer.connect();
90+
await producer.connect();
91+
await consumer.subscribe({ topic: topicName })
92+
93+
const messagesConsumed = [];
94+
consumer.run({ eachMessage: async event => messagesConsumed.push(event) });
95+
96+
const messages = [ {
97+
value: `value-${secureRandom}`,
98+
headers: {
99+
'header-1': 'value-1',
100+
'header-2': 'value-2',
101+
'header-3': [ 'value-3-1', 'value-3-2', Buffer.from([1,0,1,0,1]) ],
102+
'header-4': Buffer.from([1,0,1,0,1]),
103+
}
104+
} ]
105+
106+
await producer.send({ topic: topicName, messages })
107+
await waitForMessages(messagesConsumed, { number: messages.length })
108+
109+
expect(messagesConsumed[0]).toEqual(
110+
expect.objectContaining({
111+
topic: topicName,
112+
partition: 0,
113+
message: expect.objectContaining({
114+
value: Buffer.from(messages[0].value),
115+
offset: '0',
116+
headers: {
117+
// Headers are always returned as Buffers from the broker.
118+
'header-1': Buffer.from('value-1'),
119+
'header-2': Buffer.from('value-2'),
120+
'header-3': [ Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1,0,1,0,1]) ],
121+
'header-4': Buffer.from([1,0,1,0,1]),
122+
}
123+
}),
124+
})
125+
)
126+
});
127+
88128
it.each([[true], [false]])('consumes messages using eachBatch', async (isAutoResolve) => {
89129
await consumer.connect();
90130
await producer.connect();

0 commit comments

Comments
 (0)