From 908d262d5a14e65a8f7e8be14be8cf74f9d89148 Mon Sep 17 00:00:00 2001 From: Ran Wang Date: Tue, 14 Jan 2025 12:30:52 -0800 Subject: [PATCH] Validate isMigrating flag during store deletion that happens as part of abortMigration --- .../java/com/linkedin/venice/AdminTool.java | 14 +-- .../com/linkedin/venice/TestAdminTool.java | 90 +++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + .../controllerapi/ControllerApiConstants.java | 1 + .../controllerapi/ControllerClient.java | 7 +- ...niceHelixAdminWithIsolatedEnvironment.java | 38 ++++++++ .../server/TestAdminSparkServer.java | 22 +++++ .../com/linkedin/venice/controller/Admin.java | 7 +- .../venice/controller/VeniceHelixAdmin.java | 40 +++++++-- .../controller/VeniceParentHelixAdmin.java | 20 +++++ .../controller/server/StoresRoutes.java | 47 +++++----- .../TestVeniceParentHelixAdmin.java | 21 ++++- .../TestVeniceParentHelixAdminWithAcl.java | 2 +- .../controller/server/StoreRoutesTest.java | 37 ++++++++ 14 files changed, 309 insertions(+), 38 deletions(-) create mode 100644 clients/venice-admin-tool/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java index 30f5a88cf54..fd1aa8ca962 100644 --- a/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java +++ b/clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java @@ -2081,9 +2081,10 @@ public static void abortMigration( throw new VeniceException("Source cluster and destination cluster cannot be the same!"); } boolean terminate = false; - - ControllerClient srcControllerClient = new ControllerClient(srcClusterName, veniceUrl, sslFactory); - ControllerClient destControllerClient = new ControllerClient(destClusterName, veniceUrl, sslFactory); + ControllerClient srcControllerClient = + ControllerClient.constructClusterControllerClient(srcClusterName, veniceUrl, sslFactory); + ControllerClient destControllerClient = + ControllerClient.constructClusterControllerClient(destClusterName, veniceUrl, sslFactory); checkPreconditionForStoreMigration(srcControllerClient, destControllerClient); // Check arguments @@ -2116,9 +2117,10 @@ public static void abortMigration( // Reset original store, storeConfig, and cluster discovery if (promptsOverride.length > 1) { terminate = !promptsOverride[1]; + } else { terminate = !userGivesPermission( - "Next step is to reset store migration flag, storeConfig and cluster" + "Next step is to reset store migration flag, storeConfig and cluster " + "discovery mapping. Do you want to proceed?"); } if (terminate) { @@ -2157,7 +2159,7 @@ public static void abortMigration( "Deleting cloned store " + storeName + " in " + destControllerClient.getLeaderControllerUrl() + " ..."); destControllerClient .updateStore(storeName, new UpdateStoreQueryParams().setEnableReads(false).setEnableWrites(false)); - TrackableControllerResponse deleteResponse = destControllerClient.deleteStore(storeName); + TrackableControllerResponse deleteResponse = destControllerClient.deleteStore(storeName, true); printObject(deleteResponse); if (deleteResponse.isError()) { System.err.println("ERROR: failed to delete store " + storeName + " in the dest cluster " + destClusterName); @@ -2169,7 +2171,7 @@ public static void abortMigration( } } - private static boolean userGivesPermission(String prompt) { + static boolean userGivesPermission(String prompt) { Console console = System.console(); String response = console.readLine(prompt + " (y/n): ").toLowerCase(); while (!response.equals("y") && !response.equals("n")) { diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java index fef519f4cb1..7509b2a0af9 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminTool.java @@ -2,8 +2,11 @@ import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import com.fasterxml.jackson.core.JsonProcessingException; import com.linkedin.venice.admin.protocol.response.AdminResponseRecord; @@ -12,9 +15,12 @@ import com.linkedin.venice.client.store.transport.TransportClientResponse; import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.controllerapi.MultiReplicaResponse; import com.linkedin.venice.controllerapi.SchemaResponse; +import com.linkedin.venice.controllerapi.StoreMigrationResponse; import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.TrackableControllerResponse; import com.linkedin.venice.controllerapi.UpdateClusterConfigQueryParams; import com.linkedin.venice.controllerapi.UpdateStoreQueryParams; import com.linkedin.venice.exceptions.VeniceException; @@ -40,6 +46,8 @@ import java.util.function.Consumer; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.ParseException; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -149,6 +157,88 @@ public void testIsClonedStoreOnline() { Assert.assertFalse(AdminTool.isClonedStoreOnline(srcControllerClient, destControllerClient, storeName)); } + @Test + public void testAbortMigration() { + String storeName = "testAbortMigrationStore"; + String srcCluster = "testCluster1"; + String dstCluster = "testCluster2"; + + StoreResponse storeResponse = new StoreResponse(); + StoreInfo srcStoreInfo = createStore(storeName, true); + srcStoreInfo.setStoreMetaSystemStoreEnabled(true); + storeResponse.setStore(srcStoreInfo); + + try (MockedStatic controllerClientMockedStatic = Mockito.mockStatic(ControllerClient.class)) { + try (MockedStatic adminToolMockedStatic = + Mockito.mockStatic(AdminTool.class, Mockito.CALLS_REAL_METHODS)) { + ControllerClient srcControllerClient = mock(ControllerClient.class); + ControllerClient destControllerClient = mock(ControllerClient.class); + Mockito.when(srcControllerClient.getStore(storeName)).thenReturn(storeResponse); + + StoreMigrationResponse storeMigrationResponse = new StoreMigrationResponse(); + storeMigrationResponse.isStoreMigrationAllowed(); + Mockito.when(srcControllerClient.isStoreMigrationAllowed()).thenReturn(storeMigrationResponse); + Mockito.when(destControllerClient.isStoreMigrationAllowed()).thenReturn(storeMigrationResponse); + + D2ServiceDiscoveryResponse discoveryResponse = new D2ServiceDiscoveryResponse(); + discoveryResponse.setCluster(srcCluster); + Mockito.when(srcControllerClient.discoverCluster(storeName)).thenReturn(discoveryResponse); + + StoreMigrationResponse abortMigrationResponse = new StoreMigrationResponse(); + abortMigrationResponse.setSrcClusterName(srcCluster); + abortMigrationResponse.setCluster(srcCluster); + abortMigrationResponse.setName(storeName); + + Mockito.when(srcControllerClient.abortMigration(storeName, dstCluster)).thenReturn(abortMigrationResponse); + Mockito.when(destControllerClient.getStore(storeName)).thenReturn(storeResponse); + Mockito.when(destControllerClient.deleteStore(storeName, true)).thenReturn(new TrackableControllerResponse()); + + controllerClientMockedStatic + .when(() -> ControllerClient.constructClusterControllerClient(eq(srcCluster), any(), any())) // make two + // clients + .thenReturn(srcControllerClient); + controllerClientMockedStatic + .when(() -> ControllerClient.constructClusterControllerClient(eq(dstCluster), any(), any())) + .thenReturn(destControllerClient); + + adminToolMockedStatic.when(() -> AdminTool.userGivesPermission("Do you still want to proceed")) + .thenReturn(false); + + AdminTool.abortMigration("http://localhost:7036", storeName, srcCluster, dstCluster, false, new boolean[0]); + Mockito.verify(srcControllerClient, times(0)).discoverCluster(storeName); + Mockito.verify(srcControllerClient, times(0)).abortMigration(storeName, dstCluster); // verify dest client is + // called with true flag + // and store name as + // deletestore method + Mockito.verify(destControllerClient, times(0)).deleteStore(storeName, true); // verify dest client is called + // with true flag and store name as + // deletestore method + + srcStoreInfo.setMigrating(true); + storeResponse.setStore(srcStoreInfo); + Mockito.when(srcControllerClient.getStore(storeName)).thenReturn(storeResponse); + Mockito.when(destControllerClient.getStore(storeName)).thenReturn(storeResponse); + + String promptAbortMigration = "Next step is to reset store migration flag, storeConfig and cluster " + + "discovery mapping. Do you want to proceed?"; + adminToolMockedStatic.when(() -> AdminTool.userGivesPermission(promptAbortMigration)).thenReturn(true); + String promptDeleteStore = "Next step is to delete the cloned store in dest cluster testCluster2. " + + "testAbortMigrationStore in testCluster2 will be deleted irreversibly. " + + "Please verify there is no reads/writes to the cloned store. " + "Do you want to proceed?"; + adminToolMockedStatic.when(() -> AdminTool.userGivesPermission(promptDeleteStore)).thenReturn(true); + + AdminTool.abortMigration("http://localhost:7036", storeName, srcCluster, dstCluster, false, new boolean[0]); + Mockito.verify(srcControllerClient, times(1)).abortMigration(storeName, dstCluster); // verify dest client is + // called with true flag + // and store name as + // deletestore method + Mockito.verify(destControllerClient, times(1)).deleteStore(storeName, true); // verify dest client is called + // with true flag and store name as + // deletestore method + } + } + } + private StoreInfo createStore(String storeName, boolean hasOnlineVersion) { StoreInfo storeInfo = new StoreInfo(); if (hasOnlineVersion) { diff --git a/clients/venice-admin-tool/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/clients/venice-admin-tool/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000000..ca6ee9cea8e --- /dev/null +++ b/clients/venice-admin-tool/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index a755a73d9ed..f3a4f515534 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -7,6 +7,7 @@ public class ControllerApiConstants { public static final String SOURCE_GRID_FABRIC = "source_grid_fabric"; public static final String BATCH_JOB_HEARTBEAT_ENABLED = "batch_job_heartbeat_enabled"; + public static final String IS_ABORT_MIGRATION_CLEANUP = "is_abort_migration_cleanup"; public static final String NAME = "store_name"; public static final String STORE_PARTITION = "store_partition"; public static final String STORE_VERSION = "store_version"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java index 61ecaba454a..87308e8c9f9 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerClient.java @@ -20,6 +20,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.HEARTBEAT_TIMESTAMP; import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCLUDE_SYSTEM_STORES; import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCREMENTAL_PUSH_VERSION; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_ABORT_MIGRATION_CLEANUP; import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_SYSTEM_STORE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_WRITE_COMPUTE_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.KAFKA_TOPIC_LOG_COMPACTION_ENABLED; @@ -601,7 +602,11 @@ public StoreMigrationResponse abortMigration(String storeName, String destCluste } public TrackableControllerResponse deleteStore(String storeName) { - QueryParams params = newParams().add(NAME, storeName); + return deleteStore(storeName, false); + } + + public TrackableControllerResponse deleteStore(String storeName, boolean isAbortMigratingStore) { + QueryParams params = newParams().add(NAME, storeName).add(IS_ABORT_MIGRATION_CLEANUP, isAbortMigratingStore); return request(ControllerRoute.DELETE_STORE, params, TrackableControllerResponse.class); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java index 63cbfe22e50..2c7e8065734 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithIsolatedEnvironment.java @@ -12,6 +12,7 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; @@ -234,6 +235,43 @@ public void testExternalViewDataChangeDeadLock() throws InterruptedException { } } + @Test(timeOut = TOTAL_TIMEOUT_FOR_LONG_TEST_MS) + public void testAbortMigrationStoreDeletion() { + String storeName = Utils.getUniqueString("test_abort_migration_cleanup_store"); + try { + veniceAdmin.createStore(clusterName, storeName, storeOwner, KEY_SCHEMA, VALUE_SCHEMA); + veniceAdmin.updateStore( + clusterName, + storeName, + new UpdateStoreQueryParams().setStoreMigration(false).setEnableReads(false).setEnableWrites(false)); + + PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); + veniceAdmin.getTopicManager().createTopic(rtTopic, 1, 1, true); + + Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(rtTopic)); + boolean abort = true; + veniceAdmin.deleteStore(clusterName, storeName, abort, Store.IGNORE_VERSION, false); + Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(rtTopic)); + Assert.assertNotNull(veniceAdmin.getStore(clusterName, storeName)); + + String newStoreName = Utils.getUniqueString("test_cleanup_store"); + veniceAdmin.createStore(clusterName, newStoreName, storeOwner, KEY_SCHEMA, VALUE_SCHEMA); + veniceAdmin.updateStore( + clusterName, + newStoreName, + new UpdateStoreQueryParams().setStoreMigration(false).setEnableReads(false).setEnableWrites(false)); + PubSubTopic newRtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(newStoreName)); + veniceAdmin.getTopicManager().createTopic(newRtTopic, 1, 1, true); + abort = false; + Assert.assertTrue(veniceAdmin.getTopicManager().containsTopic(newRtTopic)); + veniceAdmin.deleteStore(clusterName, newStoreName, abort, Store.IGNORE_VERSION, false); + Assert.assertNull(veniceAdmin.getStore(clusterName, newStoreName)); + + } finally { + veniceAdmin.deleteStore(clusterName, storeName, false, Store.IGNORE_VERSION, false); + } + } + @Test public void testIdempotentStoreDeletion() { String storeName = Utils.getUniqueString("test_delete_store"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java index 9256626b392..c72b380558a 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/controller/server/TestAdminSparkServer.java @@ -896,6 +896,28 @@ public void controllerClientCanDeleteStore() { } } + @Test(timeOut = TEST_TIMEOUT) + public void controllerClientCanNotDeleteStore() { + String storeName = Utils.getUniqueString("test-store-not-delete"); + assertCommand(parentControllerClient.createNewStore(storeName, "owner", "\"string\"", "\"string\"")); + VersionCreationResponse versionCreationResponse = + parentControllerClient.emptyPush(storeName, Utils.getUniqueString(storeName), 1024); + Assert.assertFalse(versionCreationResponse.isError(), versionCreationResponse.getError()); + try { + parentControllerClient.enableStoreReads(storeName, false); + parentControllerClient.enableStoreWrites(storeName, false); + + TrackableControllerResponse response = parentControllerClient.deleteStore(storeName, true); + Assert.assertTrue(response.isError(), response.getError()); + Assert.assertEquals(response.getErrorType(), ErrorType.INVALID_CONFIG); + StoreResponse storeResponse = parentControllerClient.getStore(storeName); + Assert.assertFalse(storeResponse.isError(), storeResponse.getError()); + Assert.assertEquals(storeName, storeResponse.getStore().getName()); + } finally { + deleteStore(storeName); + } + } + @Test(timeOut = TEST_TIMEOUT) public void controllerClientCanGetExecutionOfDeleteStore() { String clusterName = venice.getClusterNames()[0]; diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 2358c09762b..9c2c78b5d76 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -180,7 +180,12 @@ void createStore( * Delete the entire store including both metadata and real user's data. Before deleting a store, we should disable * the store manually to ensure there is no reading/writing request hitting this tore. */ - void deleteStore(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion); + void deleteStore( + String clusterName, + String storeName, + boolean isAbortMigrationCleanup, + int largestUsedVersionNumber, + boolean waitOnRTTopicDeletion); /** * This method behaves differently in {@link VeniceHelixAdmin} and {@link VeniceParentHelixAdmin}. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 66329256d61..0d46582b7c5 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -1042,9 +1042,24 @@ private void configureNewStore(Store newStore, VeniceControllerClusterConfig con public void deleteStore( String clusterName, String storeName, + boolean isAbortMigrationCleanup, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) { - deleteStore(clusterName, storeName, largestUsedVersionNumber, waitOnRTTopicDeletion, false); + deleteStore( + clusterName, + storeName, + largestUsedVersionNumber, + waitOnRTTopicDeletion, + false, + isAbortMigrationCleanup); + } + + public void deleteStore( + String clusterName, + String storeName, + int largestUsedVersionNumber, + boolean waitOnRTTopicDeletion) { + deleteStore(clusterName, storeName, largestUsedVersionNumber, waitOnRTTopicDeletion, false, false); } private void deleteStore( @@ -1052,7 +1067,8 @@ private void deleteStore( String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion, - boolean isForcedDelete) { + boolean isForcedDelete, + boolean isAbortMigrationCleanup) { checkControllerLeadershipFor(clusterName); LOGGER.info("Start deleting store: {} in cluster {}", storeName, clusterName); HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName); @@ -1067,8 +1083,16 @@ private void deleteStore( storeRepository.updateStore(store); } catch (VeniceNoStoreException e) { // It's possible for a store to partially exist due to partial delete/creation failures. - LOGGER - .warn("Store object is missing for store: " + storeName + " will proceed with the rest of store deletion"); + LOGGER.warn("Store object is missing for store: {} will proceed with the rest of store deletion", storeName); + } + if (isAbortMigrationCleanup && store != null && !store.isMigrating()) { + LOGGER.warn( + "Deletion of store: {} in cluster: {} was issued as part of abort migration resource cleanup, " + + "but the store's migrating flag is false. Please ensure the store's migrating flag is set to true in the " + + "destination cluster before issuing the delete command to prevent accidental deletion of shared resources.", + storeName, + clusterName); + return; } if (storeConfig != null) { setStoreConfigDeletingFlag(storeConfig, clusterName, storeName, store); @@ -1084,7 +1108,7 @@ private void deleteStore( // Truncate all the version topics, this is a prerequisite to delete the RT topic truncateOldTopics(clusterName, store, true); - if (!store.isMigrating()) { + if (!store.isMigrating() && !isAbortMigrationCleanup) { // for RT topic block on deletion so that next create store does not see the lingering RT topic which could // have different partition count PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store)); @@ -1152,7 +1176,7 @@ private void setStoreConfigDeletingFlag(StoreConfig storeConfig, String clusterN clusterName, currentlyDiscoveredClusterName); } else if (store != null && store.isMigrating()) { - // Cluster discovery is correct but store migration flag has not been set. + // Cluster discovery is correct but store migration flag has been set. // This is most likely a direct deletion command from admin-tool sent to the wrong cluster. // i.e. instead of using the proper --end-migration command, a --delete-store command was issued AND sent to the // wrong cluster @@ -7707,7 +7731,7 @@ public void wipeCluster(String clusterName, String fabric, Optional stor deleteOneStoreVersion(clusterName, storeName.get(), versionNum.get(), true); } else { setStoreReadWriteability(clusterName, storeName.get(), false); - deleteStore(clusterName, storeName.get(), Store.IGNORE_VERSION, false, true); + deleteStore(clusterName, storeName.get(), Store.IGNORE_VERSION, false, true, false); } } else { try (AutoCloseableLock ignore = resources.getClusterLockManager().createClusterWriteLock()) { @@ -7718,7 +7742,7 @@ public void wipeCluster(String clusterName, String fabric, Optional stor continue; } setStoreReadWriteability(clusterName, store.getName(), false); - deleteStore(clusterName, store.getName(), Store.IGNORE_VERSION, false, true); + deleteStore(clusterName, store.getName(), Store.IGNORE_VERSION, false, true, false); } } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 74365eb9b50..0b6ace0c371 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -959,8 +959,28 @@ private void setupResourceForBatchJobHeartbeatStore(String batchJobHeartbeatStor public void deleteStore( String clusterName, String storeName, + boolean isAbortMigrationCleanup, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) { + if (isAbortMigrationCleanup) { + HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName); + try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreReadLock(storeName)) { + ReadWriteStoreRepository repository = resources.getStoreMetadataRepository(); + Store store = repository.getStore(storeName); + if (store != null && !store.isMigrating()) { + LOGGER.warn( + "Deletion of store: {} in cluster: {} was issued as part of abort migration resource cleanup, " + + "but the store's migrating flag is false. Please ensure the store's migrating flag is set to true in the " + + "destination cluster before issuing the deleteStore to prevent accidental deletion of shared resources.", + storeName, + clusterName); + throw new VeniceException( + "Store " + storeName + "'s migrating flag is false. Not safe to delete a store " + + "that is assumed to be migrating without the migrating flag setup as true.", + ErrorType.INVALID_CONFIG); + } + } + } acquireAdminMessageLock(clusterName, storeName); try { LOGGER.info("Deleting store: {} from cluster: {}", storeName, clusterName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java index 9b45e1adec6..ccce9bcc912 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/server/StoresRoutes.java @@ -8,6 +8,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.FABRIC_B; import static com.linkedin.venice.controllerapi.ControllerApiConstants.HEARTBEAT_TIMESTAMP; import static com.linkedin.venice.controllerapi.ControllerApiConstants.INCLUDE_SYSTEM_STORES; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.IS_ABORT_MIGRATION_CLEANUP; import static com.linkedin.venice.controllerapi.ControllerApiConstants.NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OPERATION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.OWNER; @@ -510,29 +511,35 @@ public Route deleteStore(Admin admin) { return new VeniceRouteHandler(TrackableControllerResponse.class) { @Override public void internalHandle(Request request, TrackableControllerResponse veniceResponse) { - // Only allow allowlist users to run this command - if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) { - return; - } - AdminSparkServer.validateParams(request, DELETE_STORE.getParams(), admin); - String clusterName = request.queryParams(CLUSTER); - String storeName = request.queryParams(NAME); + try { + // Only allow allowlist users to run this command + if (!checkIsAllowListUser(request, veniceResponse, () -> isAllowListUser(request))) { + return; + } + AdminSparkServer.validateParams(request, DELETE_STORE.getParams(), admin); + String clusterName = request.queryParams(CLUSTER); + String storeName = request.queryParams(NAME); + boolean abortMigratingStore = + Utils.parseBooleanFromString(request.queryParams(IS_ABORT_MIGRATION_CLEANUP), IS_ABORT_MIGRATION_CLEANUP); + veniceResponse.setCluster(clusterName); + veniceResponse.setName(storeName); - veniceResponse.setCluster(clusterName); - veniceResponse.setName(storeName); + Optional adminCommandExecutionTracker = + admin.getAdminCommandExecutionTracker(clusterName); - Optional adminCommandExecutionTracker = - admin.getAdminCommandExecutionTracker(clusterName); - if (adminCommandExecutionTracker.isPresent()) { - // Lock the tracker to get the execution id for the last admin command. - // If will not make our performance worse, because we lock the whole cluster while handling the admin - // operation in parent admin. - synchronized (adminCommandExecutionTracker) { - admin.deleteStore(clusterName, storeName, Store.IGNORE_VERSION, false); - veniceResponse.setExecutionId(adminCommandExecutionTracker.get().getLastExecutionId()); + if (adminCommandExecutionTracker.isPresent()) { + // Lock the tracker to get the execution id for the last admin command. + // It will not make our performance worse, because we lock the whole cluster while handling the admin + // operation in parent admin. + synchronized (adminCommandExecutionTracker) { + admin.deleteStore(clusterName, storeName, abortMigratingStore, Store.IGNORE_VERSION, false); + veniceResponse.setExecutionId(adminCommandExecutionTracker.get().getLastExecutionId()); + } + } else { + admin.deleteStore(clusterName, storeName, abortMigratingStore, Store.IGNORE_VERSION, false); } - } else { - admin.deleteStore(clusterName, storeName, Store.IGNORE_VERSION, false); + } catch (Throwable e) { + veniceResponse.setError(e); } } }; diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java index e683b7b86e3..d26d15bbaef 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdmin.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.compression.CompressionStrategy; @@ -2164,6 +2165,24 @@ public void testUpdateStoreWithBadPartitionerConfigs() { verify(veniceWriter, times(0)).put(any(), any(), anyInt()); } + @Test + public void testAbortMigrationDeleteStore() { + String storeName = "test-testAbortMigrationCreateStore"; + String owner = "unitTest"; + Store store = TestUtils.createTestStore(storeName, owner, System.currentTimeMillis()); + + doReturn(store).when(internalAdmin).getStore(eq(clusterName), eq(storeName)); + doReturn(store).when(internalAdmin).checkPreConditionForDeletion(eq(clusterName), eq(storeName)); + assertTrue(!store.isMigrating()); + parentAdmin.initStorageCluster(clusterName); + Exception exp = Assert + .expectThrows(VeniceException.class, () -> parentAdmin.deleteStore(clusterName, storeName, true, 0, true)); + assertEquals( + "Store test-testAbortMigrationCreateStore's migrating flag is false. Not safe to delete a store " + + "that is assumed to be migrating without the migrating flag setup as true.", + exp.getMessage()); + } + @Test public void testDeleteStore() { String storeName = "test-testReCreateStore"; @@ -2180,7 +2199,7 @@ public void testDeleteStore() { .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); parentAdmin.initStorageCluster(clusterName); - parentAdmin.deleteStore(clusterName, storeName, 0, true); + parentAdmin.deleteStore(clusterName, storeName, false, 0, true); verify(veniceWriter).put(any(), any(), anyInt()); verify(zkClient, times(1)).readData(zkMetadataNodePath, null); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java index 3b661283094..ba63bf15e3c 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceParentHelixAdminWithAcl.java @@ -129,7 +129,7 @@ public void testDeleteStoreWithAuthorization() { .thenReturn(AdminTopicMetadataAccessor.generateMetadataMap(1, -1, 1)); initializeParentAdmin(Optional.of(authorizerService)); parentAdmin.initStorageCluster(clusterName); - parentAdmin.deleteStore(clusterName, storeName, 0, true); + parentAdmin.deleteStore(clusterName, storeName, false, 0, true); Assert.assertEquals(1, authorizerService.clearAclCounter); AclBinding actualAB = authorizerService.describeAcls(new Resource(storeName)); Assert.assertTrue(isAclBindingSame(new AclBinding(new Resource(storeName)), actualAB)); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRoutesTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRoutesTest.java index c4341107065..3356bd2502a 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRoutesTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/server/StoreRoutesTest.java @@ -1,10 +1,12 @@ package com.linkedin.venice.controller.server; import static com.linkedin.venice.exceptions.ErrorType.INCORRECT_CONTROLLER; +import static com.linkedin.venice.exceptions.ErrorType.INVALID_CONFIG; import static com.linkedin.venice.exceptions.ErrorType.STORE_NOT_FOUND; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import com.linkedin.venice.controller.Admin; @@ -13,6 +15,8 @@ import com.linkedin.venice.controllerapi.ControllerApiConstants; import com.linkedin.venice.controllerapi.MultiStoreStatusResponse; import com.linkedin.venice.controllerapi.StoreResponse; +import com.linkedin.venice.controllerapi.TrackableControllerResponse; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.meta.OfflinePushStrategy; import com.linkedin.venice.meta.PersistenceType; import com.linkedin.venice.meta.ReadStrategy; @@ -89,6 +93,39 @@ public void testRollForwardToFutureVersion() throws Exception { Assert.assertEquals(multiStoreStatusResponse.getCluster(), TEST_CLUSTER); } + @Test + public void testDeleteStore() throws Exception { + Admin mockAdmin = mock(VeniceParentHelixAdmin.class); + doReturn(true).when(mockAdmin).isLeaderControllerFor(TEST_CLUSTER); + + Request request = mock(Request.class); + doReturn(TEST_CLUSTER).when(request).queryParams(eq(ControllerApiConstants.CLUSTER)); + doReturn(TEST_STORE_NAME).when(request).queryParams(eq(ControllerApiConstants.NAME)); + doReturn("true").when(request).queryParams(eq(ControllerApiConstants.IS_ABORT_MIGRATION_CLEANUP)); + + Route deleteStoreRoute = new StoresRoutes(false, Optional.empty(), pubSubTopicRepository).deleteStore(mockAdmin); + TrackableControllerResponse trackableControllerResponse = ObjectMapperFactory.getInstance() + .readValue( + deleteStoreRoute.handle(request, mock(Response.class)).toString(), + TrackableControllerResponse.class); + Assert.assertFalse(trackableControllerResponse.isError()); + Assert.assertEquals(trackableControllerResponse.getCluster(), TEST_CLUSTER); + Assert.assertEquals(trackableControllerResponse.getName(), TEST_STORE_NAME); + + String errMessage = "Store " + TEST_STORE_NAME + "'s migrating flag is false. Not safe to delete a store " + + "that is assumed to be migrating without the migrating flag setup as true."; + doThrow(new VeniceException(errMessage, INVALID_CONFIG)).when(mockAdmin) + .deleteStore(TEST_CLUSTER, TEST_STORE_NAME, true, Store.IGNORE_VERSION, false); + trackableControllerResponse = ObjectMapperFactory.getInstance() + .readValue( + deleteStoreRoute.handle(request, mock(Response.class)).toString(), + TrackableControllerResponse.class); + Assert.assertTrue(trackableControllerResponse.isError()); + Assert.assertEquals(trackableControllerResponse.getErrorType(), INVALID_CONFIG); + Assert.assertEquals(trackableControllerResponse.getCluster(), TEST_CLUSTER); + Assert.assertEquals(trackableControllerResponse.getName(), TEST_STORE_NAME); + } + @Test public void testGetFutureVersionForChildController() throws Exception { Admin mockAdmin = mock(VeniceHelixAdmin.class);