Skip to content

Commit

Permalink
Validate isMigrating flag during store deletion that happens as part …
Browse files Browse the repository at this point in the history
…of abortMigration
  • Loading branch information
Ran Wang committed Jan 24, 2025
1 parent 49dc236 commit 908d262
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2081,9 +2081,10 @@ public static void abortMigration(
throw new VeniceException("Source cluster and destination cluster cannot be the same!");
}
boolean terminate = false;

ControllerClient srcControllerClient = new ControllerClient(srcClusterName, veniceUrl, sslFactory);
ControllerClient destControllerClient = new ControllerClient(destClusterName, veniceUrl, sslFactory);
ControllerClient srcControllerClient =
ControllerClient.constructClusterControllerClient(srcClusterName, veniceUrl, sslFactory);
ControllerClient destControllerClient =
ControllerClient.constructClusterControllerClient(destClusterName, veniceUrl, sslFactory);
checkPreconditionForStoreMigration(srcControllerClient, destControllerClient);

// Check arguments
Expand Down Expand Up @@ -2116,9 +2117,10 @@ public static void abortMigration(
// Reset original store, storeConfig, and cluster discovery
if (promptsOverride.length > 1) {
terminate = !promptsOverride[1];

} else {
terminate = !userGivesPermission(
"Next step is to reset store migration flag, storeConfig and cluster"
"Next step is to reset store migration flag, storeConfig and cluster "
+ "discovery mapping. Do you want to proceed?");
}
if (terminate) {
Expand Down Expand Up @@ -2157,7 +2159,7 @@ public static void abortMigration(
"Deleting cloned store " + storeName + " in " + destControllerClient.getLeaderControllerUrl() + " ...");
destControllerClient
.updateStore(storeName, new UpdateStoreQueryParams().setEnableReads(false).setEnableWrites(false));
TrackableControllerResponse deleteResponse = destControllerClient.deleteStore(storeName);
TrackableControllerResponse deleteResponse = destControllerClient.deleteStore(storeName, true);
printObject(deleteResponse);
if (deleteResponse.isError()) {
System.err.println("ERROR: failed to delete store " + storeName + " in the dest cluster " + destClusterName);
Expand All @@ -2169,7 +2171,7 @@ public static void abortMigration(
}
}

private static boolean userGivesPermission(String prompt) {
static boolean userGivesPermission(String prompt) {
Console console = System.console();
String response = console.readLine(prompt + " (y/n): ").toLowerCase();
while (!response.equals("y") && !response.equals("n")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.venice.admin.protocol.response.AdminResponseRecord;
Expand All @@ -12,9 +15,12 @@
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.MultiReplicaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreMigrationResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.TrackableControllerResponse;
import com.linkedin.venice.controllerapi.UpdateClusterConfigQueryParams;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -40,6 +46,8 @@
import java.util.function.Consumer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -149,6 +157,88 @@ public void testIsClonedStoreOnline() {
Assert.assertFalse(AdminTool.isClonedStoreOnline(srcControllerClient, destControllerClient, storeName));
}

@Test
public void testAbortMigration() {
String storeName = "testAbortMigrationStore";
String srcCluster = "testCluster1";
String dstCluster = "testCluster2";

StoreResponse storeResponse = new StoreResponse();
StoreInfo srcStoreInfo = createStore(storeName, true);
srcStoreInfo.setStoreMetaSystemStoreEnabled(true);
storeResponse.setStore(srcStoreInfo);

try (MockedStatic<ControllerClient> controllerClientMockedStatic = Mockito.mockStatic(ControllerClient.class)) {
try (MockedStatic<AdminTool> adminToolMockedStatic =
Mockito.mockStatic(AdminTool.class, Mockito.CALLS_REAL_METHODS)) {
ControllerClient srcControllerClient = mock(ControllerClient.class);
ControllerClient destControllerClient = mock(ControllerClient.class);
Mockito.when(srcControllerClient.getStore(storeName)).thenReturn(storeResponse);

StoreMigrationResponse storeMigrationResponse = new StoreMigrationResponse();
storeMigrationResponse.isStoreMigrationAllowed();
Mockito.when(srcControllerClient.isStoreMigrationAllowed()).thenReturn(storeMigrationResponse);
Mockito.when(destControllerClient.isStoreMigrationAllowed()).thenReturn(storeMigrationResponse);

D2ServiceDiscoveryResponse discoveryResponse = new D2ServiceDiscoveryResponse();
discoveryResponse.setCluster(srcCluster);
Mockito.when(srcControllerClient.discoverCluster(storeName)).thenReturn(discoveryResponse);

StoreMigrationResponse abortMigrationResponse = new StoreMigrationResponse();
abortMigrationResponse.setSrcClusterName(srcCluster);
abortMigrationResponse.setCluster(srcCluster);
abortMigrationResponse.setName(storeName);

Mockito.when(srcControllerClient.abortMigration(storeName, dstCluster)).thenReturn(abortMigrationResponse);
Mockito.when(destControllerClient.getStore(storeName)).thenReturn(storeResponse);
Mockito.when(destControllerClient.deleteStore(storeName, true)).thenReturn(new TrackableControllerResponse());

controllerClientMockedStatic
.when(() -> ControllerClient.constructClusterControllerClient(eq(srcCluster), any(), any())) // make two
// clients
.thenReturn(srcControllerClient);
controllerClientMockedStatic
.when(() -> ControllerClient.constructClusterControllerClient(eq(dstCluster), any(), any()))
.thenReturn(destControllerClient);

adminToolMockedStatic.when(() -> AdminTool.userGivesPermission("Do you still want to proceed"))
.thenReturn(false);

AdminTool.abortMigration("http://localhost:7036", storeName, srcCluster, dstCluster, false, new boolean[0]);
Mockito.verify(srcControllerClient, times(0)).discoverCluster(storeName);
Mockito.verify(srcControllerClient, times(0)).abortMigration(storeName, dstCluster); // verify dest client is
// called with true flag
// and store name as
// deletestore method
Mockito.verify(destControllerClient, times(0)).deleteStore(storeName, true); // verify dest client is called
// with true flag and store name as
// deletestore method

srcStoreInfo.setMigrating(true);
storeResponse.setStore(srcStoreInfo);
Mockito.when(srcControllerClient.getStore(storeName)).thenReturn(storeResponse);
Mockito.when(destControllerClient.getStore(storeName)).thenReturn(storeResponse);

String promptAbortMigration = "Next step is to reset store migration flag, storeConfig and cluster "
+ "discovery mapping. Do you want to proceed?";
adminToolMockedStatic.when(() -> AdminTool.userGivesPermission(promptAbortMigration)).thenReturn(true);
String promptDeleteStore = "Next step is to delete the cloned store in dest cluster testCluster2. "
+ "testAbortMigrationStore in testCluster2 will be deleted irreversibly. "
+ "Please verify there is no reads/writes to the cloned store. " + "Do you want to proceed?";
adminToolMockedStatic.when(() -> AdminTool.userGivesPermission(promptDeleteStore)).thenReturn(true);

AdminTool.abortMigration("http://localhost:7036", storeName, srcCluster, dstCluster, false, new boolean[0]);
Mockito.verify(srcControllerClient, times(1)).abortMigration(storeName, dstCluster); // verify dest client is
// called with true flag
// and store name as
// deletestore method
Mockito.verify(destControllerClient, times(1)).deleteStore(storeName, true); // verify dest client is called
// with true flag and store name as
// deletestore method
}
}
}

private StoreInfo createStore(String storeName, boolean hasOnlineVersion) {
StoreInfo storeInfo = new StoreInfo();
if (hasOnlineVersion) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class ControllerApiConstants {
public static final String SOURCE_GRID_FABRIC = "source_grid_fabric";
public static final String BATCH_JOB_HEARTBEAT_ENABLED = "batch_job_heartbeat_enabled";

public static final String IS_ABORT_MIGRATION_CLEANUP = "is_abort_migration_cleanup";
public static final String NAME = "store_name";
public static final String STORE_PARTITION = "store_partition";
public static final String STORE_VERSION = "store_version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.HEARTBEAT_TIMESTAMP;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCLUDE_SYSTEM_STORES;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_VERSION;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_ABORT_MIGRATION_CLEANUP;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_SYSTEM_STORE;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_WRITE_COMPUTE_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED;
Expand Down Expand Up @@ -601,7 +602,11 @@ public StoreMigrationResponse abortMigration(String storeName, String destCluste
}

public TrackableControllerResponse deleteStore(String storeName) {
QueryParams params = newParams().add(NAME, storeName);
return deleteStore(storeName, false);
}

public TrackableControllerResponse deleteStore(String storeName, boolean isAbortMigratingStore) {
QueryParams params = newParams().add(NAME, storeName).add(IS_ABORT_MIGRATION_CLEANUP, isAbortMigratingStore);
return request(ControllerRoute.DELETE_STORE, params, TrackableControllerResponse.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
Expand Down Expand Up @@ -234,6 +235,43 @@ public void testExternalViewDataChangeDeadLock() throws InterruptedException {
}
}

@Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS)
public void testAbortMigrationStoreDeletion() {
String storeName = Utils.getUniqueString("test_abort_migration_cleanup_store");
try {
veniceAdmin.createStore(clusterName, storeName, storeOwner, KEY_SCHEMA, VALUE_SCHEMA);
veniceAdmin.updateStore(
clusterName,
storeName,
new UpdateStoreQueryParams().setStoreMigration(false).setEnableReads(false).setEnableWrites(false));

PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName));
veniceAdmin.getTopicManager().createTopic(rtTopic, 1, 1, true);

Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(rtTopic));
boolean abort = true;
veniceAdmin.deleteStore(clusterName, storeName, abort, Store.IGNORE_VERSION, false);
Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(rtTopic));
Assert.assertNotNull(veniceAdmin.getStore(clusterName, storeName));

String newStoreName = Utils.getUniqueString("test_cleanup_store");
veniceAdmin.createStore(clusterName, newStoreName, storeOwner, KEY_SCHEMA, VALUE_SCHEMA);
veniceAdmin.updateStore(
clusterName,
newStoreName,
new UpdateStoreQueryParams().setStoreMigration(false).setEnableReads(false).setEnableWrites(false));
PubSubTopic newRtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(newStoreName));
veniceAdmin.getTopicManager().createTopic(newRtTopic, 1, 1, true);
abort = false;
Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(newRtTopic));
veniceAdmin.deleteStore(clusterName, newStoreName, abort, Store.IGNORE_VERSION, false);
Assert.assertNull(veniceAdmin.getStore(clusterName, newStoreName));

} finally {
veniceAdmin.deleteStore(clusterName, storeName, false, Store.IGNORE_VERSION, false);
}
}

@Test
public void testIdempotentStoreDeletion() {
String storeName = Utils.getUniqueString("test_delete_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,28 @@ public void controllerClientCanDeleteStore() {
}
}

@Test(timeOut = TEST_TIMEOUT)
public void controllerClientCanNotDeleteStore() {
String storeName = Utils.getUniqueString("test-store-not-delete");
assertCommand(parentControllerClient.createNewStore(storeName, "owner", "\"string\"", "\"string\""));
VersionCreationResponse versionCreationResponse =
parentControllerClient.emptyPush(storeName, Utils.getUniqueString(storeName), 1024);
Assert.assertFalse(versionCreationResponse.isError(), versionCreationResponse.getError());
try {
parentControllerClient.enableStoreReads(storeName, false);
parentControllerClient.enableStoreWrites(storeName, false);

TrackableControllerResponse response = parentControllerClient.deleteStore(storeName, true);
Assert.assertTrue(response.isError(), response.getError());
Assert.assertEquals(response.getErrorType(), ErrorType.INVALID_CONFIG);
StoreResponse storeResponse = parentControllerClient.getStore(storeName);
Assert.assertFalse(storeResponse.isError(), storeResponse.getError());
Assert.assertEquals(storeName, storeResponse.getStore().getName());
} finally {
deleteStore(storeName);
}
}

@Test(timeOut = TEST_TIMEOUT)
public void controllerClientCanGetExecutionOfDeleteStore() {
String clusterName = venice.getClusterNames()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@ void createStore(
* Delete the entire store including both metadata and real user's data. Before deleting a store, we should disable
* the store manually to ensure there is no reading/writing request hitting this tore.
*/
void deleteStore(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion);
void deleteStore(
String clusterName,
String storeName,
boolean isAbortMigrationCleanup,
int largestUsedVersionNumber,
boolean waitOnRTTopicDeletion);

/**
* This method behaves differently in {@link VeniceHelixAdmin} and {@link VeniceParentHelixAdmin}.
Expand Down
Loading

0 comments on commit 908d262

Please sign in to comment.