Skip to content

Commit 7231f62

Browse files
authored
Fix for race condition in partition state clean/dirty tracking (#666)
* Fix for race condition in partition state clean/dirty tracking
1 parent b279800 commit 7231f62

File tree

4 files changed

+53
-9
lines changed

4 files changed

+53
-9
lines changed

CHANGELOG.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ endif::[]
1616
== 0.5.2.8
1717

1818
=== Fixes
19+
1920
* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
2021
* fix: Fix target loading computation for inflight records (#662)
2122
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)
23+
* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664)
2224

2325
== 0.5.2.7
2426

README.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -1517,9 +1517,11 @@ endif::[]
15171517
== 0.5.2.8
15181518

15191519
=== Fixes
1520+
15201521
* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
15211522
* fix: Fix target loading computation for inflight records (#662)
15221523
* fix: Fix synchronisation logic for transactional producer commit affecting non-transactional usage (#665), resolves (#637)
1524+
* fix: Fix for race condition in partition state clean/dirty tracking (#666), resolves (#664)
15231525

15241526
== 0.5.2.7
15251527

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java

+22-4
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,17 @@ public class PartitionState<K, V> {
172172
private DistributionSummary ratioMetadataSpaceUsedDistributionSummary;
173173
private final PCMetrics pcMetrics;
174174

175+
/**
176+
* Additional flag to prevent overwriting dirty state that was updated during commit execution window - so that any
177+
* subsequent offsets completed while commit is being performed could mark state as dirty and retain the dirty state
178+
* on commit completion. In tight race condition - it may be set just before offset is completed and included in
179+
* commit data collection - so it is a little bit pessimistic - that may cause an additional unnecessary commit on
180+
* next commit cycle - but it is highly unlikely as throughput has to be high for this to occur - but with high
181+
* throughput there will be other offsets ready to commit anyway.
182+
*/
183+
private boolean stateChangedSinceCommitStart = false;
184+
185+
175186
public PartitionState(long newEpoch,
176187
PCModule<K, V> pcModule,
177188
TopicPartition topicPartition,
@@ -209,10 +220,13 @@ public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR
209220
}
210221

211222
private void setClean() {
212-
setDirty(false);
223+
if (!stateChangedSinceCommitStart) {
224+
setDirty(false);
225+
}
213226
}
214227

215228
private void setDirty() {
229+
stateChangedSinceCommitStart = true;
216230
setDirty(true);
217231
}
218232

@@ -382,9 +396,13 @@ public boolean isRemoved() {
382396
}
383397

384398
public Optional<OffsetAndMetadata> getCommitDataIfDirty() {
385-
return isDirty() ?
386-
of(createOffsetAndMetadata()) :
387-
empty();
399+
if (isDirty()) {
400+
// setting the flag so that any subsequent offset completed while commit is being performed could mark state as dirty
401+
// and retain the dirty state on commit completion.
402+
stateChangedSinceCommitStart = false;
403+
return of(createOffsetAndMetadata());
404+
}
405+
return empty();
388406
}
389407

390408
// visible for testing

parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.confluent.parallelconsumer.state;
22

33
/*-
4-
* Copyright (C) 2020-2022 Confluent, Inc.
4+
* Copyright (C) 2020-2023 Confluent, Inc.
55
*/
66

77
import com.google.common.truth.Truth;
@@ -16,10 +16,7 @@
1616
import pl.tlinkowski.unij.api.UniLists;
1717
import pl.tlinkowski.unij.api.UniSets;
1818

19-
import java.util.List;
20-
import java.util.Optional;
21-
import java.util.Set;
22-
import java.util.TreeSet;
19+
import java.util.*;
2320
import java.util.concurrent.ConcurrentSkipListMap;
2421
import java.util.stream.Collectors;
2522

@@ -177,5 +174,30 @@ void bootstrapPollOffsetHigherDueToRetentionOrCompaction() {
177174
assertThat(state).getAllIncompleteOffsets().containsExactlyElementsIn(expectedTruncatedIncompletes);
178175
}
179176

177+
@Test
178+
void workCompletedDuringAsyncCommitShouldKeepStateAsDirty(){
179+
final long completedOffset = 1L;
180+
final long incompleteOffset = 2L;
181+
182+
final HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(incompleteOffset),
183+
new TreeSet<>(Arrays.asList(completedOffset, incompleteOffset)));
184+
PartitionState<String, String> state = new PartitionState<>(0, mu.getModule(), tp, offsetData);
185+
state.onSuccess(completedOffset);
186+
187+
// fetch committable/completed offset
188+
OffsetAndMetadata offsetAndMetadata = state.getCommitDataIfDirty().get();
189+
190+
assertThat(offsetAndMetadata).getOffset().isEqualTo(completedOffset+1);
191+
192+
// mark incomplete work as complete
193+
state.onSuccess(incompleteOffset);
194+
assertThat(state).isDirty();
195+
196+
//mark fetched offset as committed
197+
state.onOffsetCommitSuccess(offsetAndMetadata);
198+
199+
// partition should stay dirty, since the newly completed work could be committed now.
200+
assertThat(state).isDirty();
201+
}
180202

181203
}

0 commit comments

Comments
 (0)