Skip to content

Commit

Permalink
[HUDI-7450] Fix offset computation bug when allocedEvents > actualNum…
Browse files Browse the repository at this point in the history
…Events (apache#10768)
  • Loading branch information
vinishjail97 authored Feb 27, 2024
1 parent 810d65d commit 6b071a2
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOf
if (toOffset == range.untilOffset()) {
exhaustedPartitions.add(range.partition());
}
allocedEvents += toOffset - range.fromOffset();
// We need recompute toOffset if allocedEvents larger than actualNumEvents.
if (allocedEvents > actualNumEvents) {
if (allocedEvents + (toOffset - range.fromOffset()) > actualNumEvents) {
long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents - allocedEvents));
toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd);
toOffset = Math.min(range.untilOffset(), range.fromOffset() + offsetsToAdd);
}
allocedEvents = allocedEvents + (toOffset - range.fromOffset());
OffsetRange thisRange = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset);
finalRanges.add(thisRange);
ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset() + thisRange.count(), range.untilOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,29 @@ public void testSplitAndMergeRanges() {
assertEquals(300, mergedRanges[0].untilOffset());
}

@Test
public void testNumAllocatedEventsGreaterThanNumActualEvents() {
int[] partitions = new int[] {0, 1, 2, 3, 4};
long[] committedOffsets =
new long[] {76888767, 76725043, 76899767, 76833267, 76952055};
long[] latestOffsets =
new long[] {77005407, 76768151, 76985456, 76917973, 77080447};
OffsetRange[] ranges =
KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(
makeOffsetMap(partitions, committedOffsets),
makeOffsetMap(partitions, latestOffsets),
400000,
20);

long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(ranges);
assertEquals(400000, totalNewMsgs);
for (OffsetRange range : ranges) {
if (range.fromOffset() > range.untilOffset()) {
throw new IllegalArgumentException("Invalid offset range " + range);
}
}
}

private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) {
Map<TopicPartition, Long> map = new HashMap<>();
for (int i = 0; i < partitions.length; i++) {
Expand Down

0 comments on commit 6b071a2

Please sign in to comment.