Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[server] Fix Offset Lag Short Circuit #1472

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public void onBecomeStandbyFromOffline(Message message, NotificationContext cont
*/
if (isRegularStoreCurrentVersion) {
notifier.startConsumption(resourceName, getPartition());
getIngestionBackend().getStoreIngestionService().recordLatchCreation(resourceName, getPartition());
}
try {
long startTimeForSettingUpNewStorePartitionInNs = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ CountDownLatch getIngestionCompleteFlag(String resourceName, int partitionId) {
return stateModelToIngestionCompleteFlagMap.get(getStateModelID(resourceName, partitionId));
}

void removeIngestionCompleteFlag(String resourceName, int partitionId) {
stateModelToIngestionCompleteFlagMap.remove(getStateModelID(resourceName, partitionId));
}

@Override
public void completed(String resourceName, int partitionId, long offset, String message) {
CountDownLatch ingestionCompleteFlag = getIngestionCompleteFlag(resourceName, partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,10 @@ public void recordIngestionFailure(String storeName) {
hostLevelIngestionStats.getStoreStats(storeName).recordIngestionFailure();
}

public void recordLatchCreation(String topicName, int partition) {
getStoreIngestionTask(topicName).recordLatchCreation(partition);
}

@Override
public AggVersionedIngestionStats getAggVersionedIngestionStats() {
return versionedIngestionStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1823,11 +1823,17 @@ public boolean isReadyToServeAnnouncedWithRTLag() {
return false;
}

@Override
void recordLatchCreation(int partition) {
PartitionConsumptionState pcs = partitionConsumptionStateMap.get(partition);
pcs.recordLatchCreation();
}

@Override
protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs) {
int partition = pcs.getPartition();

if (pcs.isEndOfPushReceived() && !pcs.isLatchReleased()) {
if (pcs.isEndOfPushReceived() && pcs.isLatchCreated() && !pcs.isLatchReleased()) {
long lag = measureLagWithCallToPubSub(
localKafkaServer,
versionTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public class PartitionConsumptionState {

private CompletableFuture<Void> lastVTProduceCallFuture;

/**
* State machine that can only transition to CREATED if NONE, and transition to RELEASED if CREATED.
*/
enum LatchStatus {
NONE, LATCH_CREATED, LATCH_RELEASED
}

/**
* Only used in L/F model. Check if the partition has released the latch.
* In L/F ingestion task, Optionally, the state model holds a latch that
Expand All @@ -74,7 +81,7 @@ public class PartitionConsumptionState {
* See {@link LeaderFollowerPartitionStateModel} for the
* details why we need latch for certain resources.
*/
private boolean isLatchReleased = false;
private LatchStatus latchStatus = LatchStatus.NONE;

/**
* This future is completed in drainer thread after persisting the associated record and offset to DB.
Expand Down Expand Up @@ -333,12 +340,24 @@ public void unsubscribe() {
this.isSubscribed = false;
}

public boolean isLatchCreated() {
return latchStatus == LatchStatus.LATCH_CREATED || latchStatus == LatchStatus.LATCH_RELEASED;
}

public void recordLatchCreation() {
if (this.latchStatus == LatchStatus.NONE) {
this.latchStatus = LatchStatus.LATCH_CREATED;
}
}

public boolean isLatchReleased() {
return isLatchReleased;
return latchStatus == LatchStatus.LATCH_RELEASED;
}

public void releaseLatch() {
this.isLatchReleased = true;
if (this.latchStatus == LatchStatus.LATCH_CREATED) {
this.latchStatus = LatchStatus.LATCH_RELEASED;
}
}

public void errorReported() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4645,6 +4645,8 @@ Lazy<CountDownLatch> getGracefulShutdownLatch() {
return gracefulShutdownLatch;
}

abstract void recordLatchCreation(int partition);

// For unit test purpose.
void setVersionRole(PartitionReplicaIngestionContext.VersionRole versionRole) {
this.versionRole = versionRole;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public abstract class AbstractVenicePartitionStateModelTest<MODEL_TYPE extends A
protected String systemStoreName;
protected int version = 1;
protected String resourceName;
protected String systemSoreResourceName;
protected String systemStoreResourceName;
protected String instanceName;

protected AggVersionedIngestionStats mockAggVersionedIngestionStats;
Expand All @@ -57,7 +57,7 @@ public void setUp() {
this.systemStoreName =
VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(Utils.getUniqueString("stateModelTestStore"));
this.resourceName = Version.composeKafkaTopic(storeName, version);
this.systemSoreResourceName = Version.composeKafkaTopic(systemStoreName, version);
this.systemStoreResourceName = Version.composeKafkaTopic(systemStoreName, version);
this.instanceName = "testInstance";

mockStoreIngestionService = Mockito.mock(KafkaStoreIngestionService.class);
Expand All @@ -83,11 +83,12 @@ public void setUp() {
mockHelixManager = Mockito.mock(HelixManager.class);

Mockito.when(mockMessage.getResourceName()).thenReturn(resourceName);
Mockito.when(mockSystemStoreMessage.getResourceName()).thenReturn(systemSoreResourceName);
Mockito.when(mockSystemStoreMessage.getResourceName()).thenReturn(systemStoreResourceName);
Mockito.when(mockReadOnlyStoreRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(resourceName)))
.thenReturn(mockStore);
Mockito
.when(mockReadOnlyStoreRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(systemSoreResourceName)))
.when(
mockReadOnlyStoreRepository.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(systemStoreResourceName)))
.thenReturn(mockSystemStore);
Mockito.when(mockStore.getBootstrapToOnlineTimeoutInHours()).thenReturn(Store.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS);
Mockito.when(mockSystemStore.getBootstrapToOnlineTimeoutInHours())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -59,14 +60,15 @@ protected LeaderFollowerIngestionProgressNotifier getNotifier() {
public void testOnBecomeFollowerFromOffline() throws Exception {
// if the resource is not the current serving version, latch is not placed.
Version version = new VersionImpl("mockStore.getName()", 2, "");
when(mockStore.getVersion(Mockito.anyInt())).thenReturn(version);
when(mockStore.getVersion(anyInt())).thenReturn(version);
when(mockStore.getCurrentVersion()).thenReturn(2);
testStateModel.onBecomeStandbyFromOffline(mockMessage, mockContext);
verify(mockNotifier, never()).waitConsumptionCompleted(
mockMessage.getResourceName(),
testPartition,
Store.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS,
mockStoreIngestionService);
verify(mockStoreIngestionService, never()).recordLatchCreation(anyString(), anyInt());

when(mockSystemStore.getCurrentVersion()).thenReturn(2);
testStateModel.onBecomeStandbyFromOffline(mockSystemStoreMessage, mockContext);
Expand All @@ -75,6 +77,7 @@ public void testOnBecomeFollowerFromOffline() throws Exception {
testPartition,
Store.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS,
mockStoreIngestionService);
verify(mockStoreIngestionService, never()).recordLatchCreation(anyString(), anyInt());

// When serving current version system store, it should have latch in place.
when(mockSystemStore.getCurrentVersion()).thenReturn(1);
Expand All @@ -84,6 +87,7 @@ public void testOnBecomeFollowerFromOffline() throws Exception {
testPartition,
Store.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS,
mockStoreIngestionService);
verify(mockStoreIngestionService, times(1)).recordLatchCreation(anyString(), anyInt());

when(mockStore.getCurrentVersion()).thenReturn(1);
testStateModel.onBecomeStandbyFromOffline(mockMessage, mockContext);
Expand All @@ -93,6 +97,7 @@ public void testOnBecomeFollowerFromOffline() throws Exception {
testPartition,
Store.BOOTSTRAP_TO_ONLINE_TIMEOUT_IN_HOURS,
mockStoreIngestionService);
verify(mockStoreIngestionService, times(2)).recordLatchCreation(anyString(), anyInt());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3438,7 +3438,6 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
endOffset =
storeIngestionTaskUnderTest.getTopicPartitionEndOffSet(localKafkaConsumerService.kafkaUrl, pubSubTopic, 0);
assertEquals(endOffset, 0L);

}

@DataProvider
Expand Down Expand Up @@ -5438,6 +5437,47 @@ public void testSnapshotGenerationConditions(boolean isBlobTransferEnabled, bool
}
}

/**
* Test that {@link LeaderFollowerStoreIngestionTask#reportIfCatchUpVersionTopicOffset(PartitionConsumptionState)}
* only executes if the latch was created and not released. Previously, it would not check if the latch was created.
* Latch creation is at the start of ingestion {@link LeaderFollowerPartitionStateModel#onBecomeStandbyFromOffline}
* only if the version is current, but {@link LeaderFollowerPartitionStateModel} is not tested in this unit test.
*/
@Test
public void testReportIfCatchUpVersionTopicOffset() throws Exception {
// Push a key-value pair to kick start the SIT and populate the PCS data structure
localVeniceWriter.broadcastStartOfPush(new HashMap<>());
localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID, PUT_KEY_FOO_TIMESTAMP, null).get();
localVeniceWriter.broadcastEndOfPush(new HashMap<>());

runTest(Collections.singleton(PARTITION_FOO), () -> {
// Wait for a real PCS to be populated after topic subscription in processCommonConsumerAction()
verify(mockStoreIngestionStats, timeout(TEST_TIMEOUT_MS).times(1)).recordTotalRecordsConsumed();

// Intentionally use a mock PCS with a different partition to avoid the SIT test interfering with the test
PartitionConsumptionState pcs = mock(PartitionConsumptionState.class);
final int P = PARTITION_BAR;
when(pcs.getPartition()).thenReturn(P);

// Case 1: Latch was not created or released, so reportIfCatchUpVersionTopicOffset() shouldn't do anything
when(pcs.isEndOfPushReceived()).thenReturn(true);
when(pcs.isLatchCreated()).thenReturn(false);
when(pcs.isLatchReleased()).thenReturn(false);
storeIngestionTaskUnderTest.reportIfCatchUpVersionTopicOffset(pcs);
verify(storeIngestionTaskUnderTest, never()).measureLagWithCallToPubSub(anyString(), any(), eq(P), anyLong());

// Case 2: Latch was created, so reportIfCatchUpVersionTopicOffset() should execute
when(pcs.isLatchCreated()).thenReturn(true);
storeIngestionTaskUnderTest.reportIfCatchUpVersionTopicOffset(pcs);
verify(storeIngestionTaskUnderTest, times(1)).measureLagWithCallToPubSub(anyString(), any(), eq(P), anyLong());

// Case 3: Latch was created and released, so reportIfCatchUpVersionTopicOffset() shouldn't do anything
when(pcs.isLatchReleased()).thenReturn(true);
storeIngestionTaskUnderTest.reportIfCatchUpVersionTopicOffset(pcs);
verify(storeIngestionTaskUnderTest, times(1)).measureLagWithCallToPubSub(anyString(), any(), eq(P), anyLong());
}, AA_OFF);
}

private VeniceStoreVersionConfig getDefaultMockVeniceStoreVersionConfig(
Consumer<VeniceStoreVersionConfig> storeVersionConfigOverride) {
// mock the store config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,9 @@ <K, T> void updateCacheAsync(
completableFutureSupplier.get().whenComplete((value, throwable) -> {
if (throwable != null) {
cache.remove(key);

T cachedContents = (cachedValue != null) ? cachedValue.getValue() : null;
LOGGER.warn("Failed to update cachedValue for key: {} cachedValue: {}", key, cachedContents, throwable);
return;
}
putLatestValueInCache(key, value, cache);
Expand Down
Loading