Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -1349,7 +1349,7 @@ void rd_kafka_toppar_next_offset_handle(rd_kafka_toppar_t *rktp,
if (rktp->rktp_query_pos.offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
int64_t orig_offset = next_pos.offset;
int64_t tail_cnt = llabs(rktp->rktp_query_pos.offset -
RD_KAFKA_OFFSET_TAIL_BASE);
RD_KAFKA_OFFSET_TAIL_BASE);

if (tail_cnt > next_pos.offset)
next_pos.offset = 0;
Expand Down Expand Up @@ -3150,6 +3150,46 @@ int rd_kafka_topic_partition_list_cmp(const void *_a,
return 0;
}

typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
void *) map_toppar_void_t;

/**
* @brief Compare two topic partition lists using a hashmap for efficiency.

*/
int rd_kafka_topic_partition_list_cmp_with_hash(
const void *_a,
const void *_b,
int (*cmp)(const void *, const void *),
unsigned int (*hash)(const void *)) {
const rd_kafka_topic_partition_list_t *a = _a, *b = _b;
int r;
int i;

r = a->cnt - b->cnt;
if (r || a->cnt == 0) {
return r;
}
map_toppar_void_t hashmap =
RD_MAP_INITIALIZER(a->cnt, cmp, hash, NULL, NULL);


for (i = 0; i < a->cnt; i++) {
rd_kafka_topic_partition_t *tp =
rd_kafka_topic_partition_copy(&a->elems[i]);
RD_MAP_SET(&hashmap, tp, (void *)1);
}

for (i = 0; i < b->cnt; i++) {
if (!RD_MAP_GET(&hashmap, &b->elems[i])) {
RD_MAP_DESTROY(&hashmap);
return 1;
}
}

RD_MAP_DESTROY(&hashmap);
return 0;
}

/**
* @brief Ensures the \p rktpar has a toppar set in _private.
Expand Down Expand Up @@ -4647,9 +4687,6 @@ const char *rd_kafka_fetch_pos2str(const rd_kafka_fetch_pos_t fetchpos) {
return ret[idx];
}

typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
void *) map_toppar_void_t;

/**
* @brief Calculates \p a ∩ \p b using \p cmp and \p hash .
* Ordered following \p a order. Elements are copied from \p a.
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,12 @@ int rd_kafka_topic_partition_list_cmp(const void *_a,
const void *_b,
int (*cmp)(const void *, const void *));

int rd_kafka_topic_partition_list_cmp_with_hash(
const void *_a,
const void *_b,
int (*cmp)(const void *, const void *),
unsigned int (*hash)(const void *));

/**
* Creates a new empty topic partition private.
*
Expand Down
6 changes: 3 additions & 3 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1564,8 +1564,9 @@ static rd_bool_t areSubscriptionsIdentical(
}

RD_MAP_FOREACH(ignore, pcurr, consumer2AllPotentialPartitions) {
if (pprev && rd_kafka_topic_partition_list_cmp(
pcurr, pprev, rd_kafka_topic_partition_cmp))
if (pprev && rd_kafka_topic_partition_list_cmp_with_hash(
pcurr, pprev, rd_kafka_topic_partition_cmp,
rd_kafka_topic_partition_hash))
return rd_false;
pprev = pcurr;
}
Expand Down Expand Up @@ -1922,7 +1923,6 @@ rd_kafka_sticky_assignor_assign_cb(rd_kafka_t *rk,
rk, &currentAssignment, &prevAssignment, isFreshAssignment,
&partition2AllPotentialConsumers, &consumer2AllPotentialPartitions);


/* All partitions that need to be assigned (initially set to all
* partitions but adjusted in the following loop) */
unassignedPartitions =
Expand Down