Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Commit 9fc7b5c

Browse files
committed
treat committed offset as next fetch, not last
this brings the behavior of the balanced consumer in line with the java implementation
1 parent d4c2dda commit 9fc7b5c

File tree

2 files changed

+6
-8
lines changed

2 files changed

+6
-8
lines changed

src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void CommitOffsets()
142142
var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key);
143143
foreach (KeyValuePair<int, PartitionTopicInfo> partition in topic.Value)
144144
{
145-
var newOffset = partition.Value.ConsumeOffset;
145+
var newOffset = partition.Value.ConsumeOffset + 1;
146146
try
147147
{
148148
if (partition.Value.ConsumeOffsetValid)

src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -501,26 +501,24 @@ private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partition,
501501

502502
//if first time starting a consumer, set the initial offset based on the config
503503
long offset = -1;
504-
long offsetCommited = -1;
505504
if (offsetCommitedString != null)
506505
{
507-
offsetCommited = long.Parse(offsetCommitedString);
508-
offset = offsetCommited + 1;
506+
offset = long.Parse(offsetCommitedString);
509507
}
510-
Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3}"
511-
, offset, topic, partition, offsetCommited);
508+
Logger.InfoFormat("Final offset {0} for topic {1} partition {2}."
509+
, offset, topic, partition);
512510

513511
var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
514512
var partTopicInfo = new PartitionTopicInfo(
515513
topic,
516514
leader,
517515
partitionId,
518516
queue,
519-
offsetCommited,
517+
offset,
520518
offset,
521519
offset,
522520
this.config.FetchSize,
523-
offsetCommited);
521+
offset);
524522
partTopicInfoMap[partitionId] = partTopicInfo;
525523
Logger.InfoFormat("{0} selected new offset {1}", partTopicInfo, offset);
526524
}

0 commit comments

Comments
 (0)