Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add deferred version swap service to parent controller #1421

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2380,5 +2380,6 @@ private ConfigKeys() {
"server.delete.unassigned.partitions.on.startup";
public static final String CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE =
"controller.enable.hybrid.store.partition.count.update";
public static final String CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS = "controller.deferred.version.swap.sleep.ms";
public static final String PUSH_JOB_VIEW_CONFIGS = "push.job.view.configs";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V3_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.getTempDataDirectory;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestDeferredVersionSwap {
private static final int NUMBER_OF_CHILD_DATACENTERS = 2;
private static final int NUMBER_OF_CLUSTERS = 1;
private VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper;
private static final String TARGET_REGION = "dc-0";

private static final String[] CLUSTER_NAMES =
IntStream.range(0, NUMBER_OF_CLUSTERS).mapToObj(i -> "venice-cluster" + i).toArray(String[]::new);

@BeforeClass
public void setUp() {
Properties controllerProps = new Properties();
controllerProps.put(CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS, 30000);
Properties serverProperties = new Properties();

multiRegionMultiClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
NUMBER_OF_CHILD_DATACENTERS,
NUMBER_OF_CLUSTERS,
1,
1,
1,
1,
1,
Optional.of(controllerProps),
Optional.of(controllerProps),
Optional.of(serverProperties));
}

@AfterClass(alwaysRun = true)
public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(multiRegionMultiClusterWrapper);
}

@Test
public void testDeferredVersionSwap() throws IOException {
File inputDir = getTempDataDirectory();
TestWriteUtils.writeSimpleAvroFileWithStringToV3Schema(inputDir, 100, 100);
// Setup job properties
String inputDirPath = "file://" + inputDir.getAbsolutePath();
String storeName = Utils.getUniqueString("testDeferredVersionSwap");
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, inputDirPath, storeName);
String keySchemaStr = "\"string\"";
UpdateStoreQueryParams storeParms = new UpdateStoreQueryParams().setUnusedSchemaDeletionEnabled(true);
storeParms.setTargetRegionSwapWaitTime(1);
storeParms.setTargetRegionSwap(TARGET_REGION);
String parentControllerURLs = multiRegionMultiClusterWrapper.getControllerConnectString();

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAMES[0], parentControllerURLs)) {
createStoreForJob(CLUSTER_NAMES[0], keySchemaStr, NAME_RECORD_V3_SCHEMA.toString(), props, storeParms).close();

// Start push job with target region push enabled
props.put(TARGETED_REGION_PUSH_WITH_DEFERRED_SWAP, true);
TestWriteUtils.runPushJob("Test push job", props);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(storeName, 1),
parentControllerClient,
30,
TimeUnit.SECONDS);

// Version should only be swapped in the target region
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
if (colo.equals(TARGET_REGION)) {
Assert.assertEquals((int) version, 1);
} else {
Assert.assertEquals((int) version, 0);
}
});
});

// Version should be swapped in all regions
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, () -> {
Map<String, Integer> coloVersions =
parentControllerClient.getStore(storeName).getStore().getColoToCurrentVersions();

coloVersions.forEach((colo, version) -> {
Assert.assertEquals((int) version, 1);
});
});
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package com.linkedin.venice.controller;

import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.RegionUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This service is in charge of swapping to a new version after a specified wait time in the remaining regions of a target region push if enabled.
* The wait time is specified through a store/version level config (target_swap_region_wait_time) and the default wait time is 60m.
*/
public class DeferredVersionSwapService extends AbstractVeniceService {
private final AtomicBoolean stop = new AtomicBoolean(false);
private final Set<String> allClusters;
private final VeniceControllerMultiClusterConfig veniceControllerMultiClusterConfig;
private final VeniceParentHelixAdmin veniceParentHelixAdmin;
private final ScheduledExecutorService deferredVersionSwapExecutor = Executors.newSingleThreadScheduledExecutor();
private final DeferredVersionSwapStats deferredVersionSwapStats;
private static final Logger LOGGER = LogManager.getLogger(DeferredVersionSwapService.class);

public DeferredVersionSwapService(
VeniceParentHelixAdmin admin,
VeniceControllerMultiClusterConfig multiClusterConfig,
DeferredVersionSwapStats deferredVersionSwapStats) {
this.veniceParentHelixAdmin = admin;
this.allClusters = multiClusterConfig.getClusters();
this.veniceControllerMultiClusterConfig = multiClusterConfig;
this.deferredVersionSwapStats = deferredVersionSwapStats;
}

@Override
public boolean startInner() throws Exception {
deferredVersionSwapExecutor.scheduleAtFixedRate(
new DeferredVersionSwapTask(),
0,
veniceControllerMultiClusterConfig.getDeferredVersionSwapSleepMs(),
TimeUnit.MILLISECONDS);
return true;
}

@Override
public void stopInner() throws Exception {
stop.set(true);
deferredVersionSwapExecutor.shutdown();
}

private Set<String> getRegionsForVersionSwap(Map<String, Integer> candidateRegions, Set<String> targetRegions) {
Set<String> remainingRegions = new HashSet<>(candidateRegions.keySet());
remainingRegions.removeAll(targetRegions);
return remainingRegions;
}

private String getTargetRegion(Set<String> targetRegions) {
return targetRegions.iterator().next();
}

private StoreResponse getStoreForRegion(String clusterName, Set<String> targetRegions, String storeName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it return a Map of store responses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only returns one store response because we only need to get the store for one target region to check the dvc heartbeat status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case can you change the function signature to take in only a region string instead of set of strings

Map<String, ControllerClient> controllerClientMap =
veniceParentHelixAdmin.getVeniceHelixAdmin().getControllerClientMap(clusterName);
ControllerClient targetRegionControllerClient = controllerClientMap.get(getTargetRegion(targetRegions));
return targetRegionControllerClient.getStore(storeName);
}

private class DeferredVersionSwapTask implements Runnable {
@Override
public void run() {
while (!stop.get()) {
try {
for (String cluster: allClusters) {
if (!veniceParentHelixAdmin.isLeaderControllerFor(cluster)) {
continue;
}

List<Store> stores = veniceParentHelixAdmin.getAllStores(cluster);
for (Store store: stores) {
if (StringUtils.isEmpty(store.getTargetSwapRegion())) {
continue;
}

int targetVersionNum = store.getLargestUsedVersionNumber();
Version targetVersion = store.getVersion(targetVersionNum);
if (targetVersion == null) {
continue;
}

if (targetVersion.getStatus() != VersionStatus.STARTED) {
continue;
}

Map<String, Integer> coloToVersions =
veniceParentHelixAdmin.getCurrentVersionsForMultiColos(cluster, store.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get the lastest version status instead of current version status

Copy link
Contributor Author

@misyel misyel Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code again, it's ok for us to keep this as is. We only use the value from this to infer the remaining regions to perform the version swap in (getRegionsForVersionSwap). We use getOffLinePushStatus to verify whether the push is completed using the ExecutionStatus before checking if the wait time has elapsed. There might be a slight delay with the version being marked ONLINE w/ this method, but it should only be a few secs after the push is completed and the default wait time is one hour

Set<String> targetRegions = RegionUtils.parseRegionsFilterList(store.getTargetSwapRegion());
Set<String> remainingRegions = getRegionsForVersionSwap(coloToVersions, targetRegions);

StoreResponse targetRegionStoreResponse = getStoreForRegion(cluster, targetRegions, store.getName());
if (!StringUtils.isEmpty(targetRegionStoreResponse.getError())) {
LOGGER.info("Got error when fetching targetRegionStore: {}", targetRegionStoreResponse.getError());
continue;
}

StoreInfo targetRegionStore = targetRegionStoreResponse.getStore();
Optional<Version> version = targetRegionStore.getVersion(targetVersionNum);
if (!version.isPresent()) {
LOGGER.info(
"Unable to find version {} for store: {} in regions: {}",
targetVersionNum,
store.getName(),
store.getTargetSwapRegion());
continue;
}

// Do not perform version swap for davinci stores
// TODO remove this check once DVC delayed ingestion is completed
if (version.get().getIsDavinciHeartbeatReported()) {
LOGGER.info(
"Skipping version swap for store: {} on version: {} as it is davinci",
store.getName(),
targetVersionNum);
continue;
}

// Check that push is completed in target regions
String kafkaTopicName = Version.composeKafkaTopic(store.getName(), targetVersionNum);
Admin.OfflinePushStatusInfo pushStatusInfo =
veniceParentHelixAdmin.getOffLinePushStatus(cluster, kafkaTopicName);
boolean didPushCompleteInTargetRegions = true;
for (String targetRegion: targetRegions) {
String executionStatus = pushStatusInfo.getExtraInfo().get(targetRegion);
if (!executionStatus.equals(ExecutionStatus.COMPLETED.toString())) {
didPushCompleteInTargetRegions = false;
LOGGER.info(
"Skipping version swap for store: {} on version: {} as push is not complete in region {}",
store.getName(),
targetVersionNum,
targetRegion);
}
}

if (!didPushCompleteInTargetRegions) {
continue;
}

// Check that waitTime has elapsed in target regions
boolean didWaitTimeElapseInTargetRegions = false;
for (String targetRegion: targetRegions) {
long completionTime = pushStatusInfo.getExtraInfoUpdateTimestamp().get(targetRegion);
long storeWaitTime = TimeUnit.MINUTES.toSeconds(store.getTargetSwapRegionWaitTime());
long currentTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
if ((completionTime + storeWaitTime) <= currentTime) {
didWaitTimeElapseInTargetRegions = true;
}
}

if (!didWaitTimeElapseInTargetRegions) {
LOGGER.info(
"Skipping version swap for store: {} on version: {} as wait time: {} has not passed",
store.getName(),
targetVersionNum,
store.getTargetSwapRegionWaitTime());
continue;
}

// TODO add call for postStoreVersionSwap() once it is implemented

String remainingRegionsString = String.join(",\\s*", remainingRegions);
LOGGER.info(
"Issuing roll forward message for store: {} in regions: {}",
store.getName(),
remainingRegionsString);
veniceParentHelixAdmin.rollForwardToFutureVersion(cluster, store.getName(), remainingRegionsString);
}
}
} catch (Exception e) {
LOGGER.warn("Caught exception: {} while performing deferred version swap", e.toString());
deferredVersionSwapStats.recordDeferredVersionSwapErrorSensor();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.venice.controller.server.AdminSparkServer;
import com.linkedin.venice.controller.server.VeniceControllerGrpcServiceImpl;
import com.linkedin.venice.controller.server.VeniceControllerRequestHandler;
import com.linkedin.venice.controller.stats.DeferredVersionSwapStats;
import com.linkedin.venice.controller.stats.TopicCleanupServiceStats;
import com.linkedin.venice.controller.supersetschema.SupersetSchemaGenerator;
import com.linkedin.venice.controller.systemstore.SystemStoreRepairService;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class VeniceController {
private final Optional<StoreGraveyardCleanupService> storeGraveyardCleanupService;
private final Optional<SystemStoreRepairService> systemStoreRepairService;

private Optional<DeferredVersionSwapService> deferredVersionSwapService;

private VeniceControllerRequestHandler secureRequestHandler;
private VeniceControllerRequestHandler unsecureRequestHandler;
private ThreadPoolExecutor grpcExecutor = null;
Expand Down Expand Up @@ -162,6 +165,7 @@ public VeniceController(VeniceControllerContext ctx) {
this.unusedValueSchemaCleanupService = createUnusedValueSchemaCleanupService();
this.storeGraveyardCleanupService = createStoreGraveyardCleanupService();
this.systemStoreRepairService = createSystemStoreRepairService();
this.deferredVersionSwapService = createDeferredVersionSwapService();
if (multiClusterConfigs.isGrpcServerEnabled()) {
initializeGrpcServer();
}
Expand Down Expand Up @@ -275,6 +279,18 @@ private Optional<UnusedValueSchemaCleanupService> createUnusedValueSchemaCleanup
return Optional.empty();
}

private Optional<DeferredVersionSwapService> createDeferredVersionSwapService() {
if (multiClusterConfigs.isParent()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional.of(
new DeferredVersionSwapService(
(VeniceParentHelixAdmin) admin,
multiClusterConfigs,
new DeferredVersionSwapStats(metricsRepository)));
}
return Optional.empty();
}

// package-private for testing
private void initializeGrpcServer() {
LOGGER.info("Initializing gRPC server as it is enabled for the controller...");
Expand Down Expand Up @@ -353,6 +369,7 @@ public void start() {
unusedValueSchemaCleanupService.ifPresent(AbstractVeniceService::start);
systemStoreRepairService.ifPresent(AbstractVeniceService::start);
disabledPartitionEnablerService.ifPresent(AbstractVeniceService::start);
deferredVersionSwapService.ifPresent(AbstractVeniceService::start);
// register with service discovery at the end
asyncRetryingServiceDiscoveryAnnouncer.register();
if (adminGrpcServer != null) {
Expand Down Expand Up @@ -417,6 +434,7 @@ public void stop() {
unusedValueSchemaCleanupService.ifPresent(Utils::closeQuietlyWithErrorLogged);
storeBackupVersionCleanupService.ifPresent(Utils::closeQuietlyWithErrorLogged);
disabledPartitionEnablerService.ifPresent(Utils::closeQuietlyWithErrorLogged);
deferredVersionSwapService.ifPresent(Utils::closeQuietlyWithErrorLogged);
if (adminGrpcServer != null) {
adminGrpcServer.stop();
}
Expand Down
Loading
Loading