Skip to content

Commit fb76d8e

Browse files
milindljliunyu
andauthored
Add metadata to TopicPartition (confluentinc#1410)
A Kafka offset commit message can include optional metadata, this adds support for it in this client. Co-authored-by: Jing Liu <[email protected]>
1 parent 6cd2e73 commit fb76d8e

File tree

4 files changed

+111
-11
lines changed

4 files changed

+111
-11
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Confluent's Python client for Apache Kafka
22

3+
4+
## v1.10.0
5+
6+
- Add metadata to TopicPartition type and commit() (#1410).
7+
8+
39
## v1.9.2
410

511
v1.9.2 is a maintenance release with the following fixes and enhancements:

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -816,15 +816,27 @@ static int TopicPartition_clear (TopicPartition *self) {
816816
Py_DECREF(self->error);
817817
self->error = NULL;
818818
}
819+
if (self->metadata) {
820+
free(self->metadata);
821+
self->metadata = NULL;
822+
}
819823
return 0;
820824
}
821825

822826
static void TopicPartition_setup (TopicPartition *self, const char *topic,
823827
int partition, long long offset,
828+
const char *metadata,
824829
rd_kafka_resp_err_t err) {
825830
self->topic = strdup(topic);
826831
self->partition = partition;
827832
self->offset = offset;
833+
834+
if (metadata != NULL) {
835+
self->metadata = strdup(metadata);
836+
} else {
837+
self->metadata = NULL;
838+
}
839+
828840
self->error = KafkaError_new_or_None(err, NULL);
829841
}
830842

@@ -843,18 +855,22 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
843855
const char *topic;
844856
int partition = RD_KAFKA_PARTITION_UA;
845857
long long offset = RD_KAFKA_OFFSET_INVALID;
858+
const char *metadata = NULL;
859+
846860
static char *kws[] = { "topic",
847861
"partition",
848862
"offset",
863+
"metadata",
849864
NULL };
850865

851-
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
852-
&topic, &partition, &offset))
866+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
867+
&topic, &partition, &offset,
868+
&metadata)) {
853869
return -1;
870+
}
854871

855872
TopicPartition_setup((TopicPartition *)self,
856-
topic, partition, offset, 0);
857-
873+
topic, partition, offset, metadata, 0);
858874
return 0;
859875
}
860876

@@ -889,6 +905,9 @@ static PyMemberDef TopicPartition_members[] = {
889905
" :py:const:`OFFSET_STORED`,"
890906
" :py:const:`OFFSET_INVALID`\n"
891907
},
908+
{"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY,
909+
"attribute metadata: Optional application metadata committed with the "
910+
"offset (string)"},
892911
{ "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
893912
":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
894913
{ NULL }
@@ -1038,14 +1057,15 @@ PyTypeObject TopicPartitionType = {
10381057
* @brief Internal factory to create a TopicPartition object.
10391058
*/
10401059
static PyObject *TopicPartition_new0 (const char *topic, int partition,
1041-
long long offset,
1060+
long long offset, const char *metadata,
10421061
rd_kafka_resp_err_t err) {
10431062
TopicPartition *self;
10441063

10451064
self = (TopicPartition *)TopicPartitionType.tp_new(
10461065
&TopicPartitionType, NULL, NULL);
10471066

1048-
TopicPartition_setup(self, topic, partition, offset, err);
1067+
TopicPartition_setup(self, topic, partition,
1068+
offset, metadata, err);
10491069

10501070
return (PyObject *)self;
10511071
}
@@ -1069,7 +1089,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
10691089
PyList_SET_ITEM(parts, i,
10701090
TopicPartition_new0(
10711091
rktpar->topic, rktpar->partition,
1072-
rktpar->offset, rktpar->err));
1092+
rktpar->offset,
1093+
rktpar->metadata,
1094+
rktpar->err));
10731095
}
10741096

10751097
return parts;
@@ -1094,6 +1116,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
10941116
c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
10951117

10961118
for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
1119+
rd_kafka_topic_partition_t *rktpar;
10971120
TopicPartition *tp = (TopicPartition *)
10981121
PyList_GetItem(plist, i);
10991122

@@ -1106,10 +1129,17 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
11061129
return NULL;
11071130
}
11081131

1109-
rd_kafka_topic_partition_list_add(c_parts,
1110-
tp->topic,
1111-
tp->partition)->offset =
1112-
tp->offset;
1132+
rktpar = rd_kafka_topic_partition_list_add(c_parts,
1133+
tp->topic,
1134+
tp->partition);
1135+
rktpar->offset = tp->offset;
1136+
if (tp->metadata != NULL) {
1137+
rktpar->metadata_size = strlen(tp->metadata) + 1;
1138+
rktpar->metadata = strdup(tp->metadata);
1139+
} else {
1140+
rktpar->metadata_size = 0;
1141+
rktpar->metadata = NULL;
1142+
}
11131143
}
11141144

11151145
return c_parts;

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ typedef struct {
352352
char *topic;
353353
int partition;
354354
int64_t offset;
355+
char *metadata;
355356
PyObject *error;
356357
} TopicPartition;
357358

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2022 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limit
17+
18+
from confluent_kafka import TopicPartition
19+
20+
21+
def commit_and_check(consumer, topic, metadata):
22+
if metadata is None:
23+
consumer.commit(offsets=[TopicPartition(topic, 0, 1)], asynchronous=False)
24+
else:
25+
consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False)
26+
27+
offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100)
28+
assert len(offsets) == 1
29+
assert offsets[0].metadata == metadata
30+
31+
32+
def test_consumer_topicpartition_metadata(kafka_cluster):
33+
topic = kafka_cluster.create_topic("test_topicpartition")
34+
consumer_conf = {'group.id': 'pytest'}
35+
36+
c = kafka_cluster.consumer(consumer_conf)
37+
38+
# Commit without any metadata.
39+
metadata = None
40+
commit_and_check(c, topic, metadata)
41+
42+
# Commit with only ASCII metadata.
43+
metadata = 'hello world'
44+
commit_and_check(c, topic, metadata)
45+
46+
# Commit with Unicode characters in metadata.
47+
metadata = 'नमस्ते दुनिया'
48+
commit_and_check(c, topic, metadata)
49+
50+
# Commit with empty string as metadata.
51+
metadata = ''
52+
commit_and_check(c, topic, metadata)
53+
54+
# Commit with invalid metadata (with null byte in the middle).
55+
metadata = 'xyz\x00abc'
56+
try:
57+
commit_and_check(c, topic, metadata)
58+
# We should never reach this point, since the prior statement should throw.
59+
assert False
60+
except ValueError as ve:
61+
assert 'embedded null character' in str(ve)
62+
63+
c.close()

0 commit comments

Comments
 (0)