Skip to content

Commit

Permalink
Integration Test [In Writing]
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Dec 9, 2024
1 parent b520ab8 commit 262bbc6
Showing 1 changed file with 14 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.DaVinciUserApp;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
Expand Down Expand Up @@ -83,15 +85,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.*;
import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -1286,8 +1280,6 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception {
String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
Expand All @@ -1301,6 +1293,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
StoreBackend storeBackend;
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
Expand Down Expand Up @@ -1345,33 +1338,8 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
}
});
}

// Test multiple client ingesting different stores concurrently
DaVinciClient<Integer, Integer> client2 = factory.getAndStartGenericAvroClient(storeName2, clientConfig);
DaVinciClient<Integer, Integer> client3 = factory.getAndStartGenericAvroClient(storeName3, clientConfig);
CompletableFuture.allOf(client2.subscribeAll(), client3.subscribeAll()).get();
assertEquals(client2.batchGet(keyValueMap.keySet()).get(), keyValueMap);
assertEquals(client3.batchGet(keyValueMap.keySet()).get(), keyValueMap);

// TODO(jlliu): Re-enable this test-case after fixing store deletion that is flaky due to
// CLIENT_USE_SYSTEM_STORE_REPOSITORY.
// // Test read from a store that is being deleted concurrently
// try (ControllerClient controllerClient = cluster.getControllerClient()) {
// ControllerResponse response = controllerClient.disableAndDeleteStore(storeName2);
// assertFalse(response.isError(), response.getError());
// TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
// assertThrows(VeniceClientException.class, () -> client2.get(KEY_COUNT / 3).get());
// });
// }
client2.unsubscribeAll();
}

// Test bootstrap-time junk removal
cluster.useControllerClient(controllerClient -> {
ControllerResponse response = controllerClient.disableAndDeleteStore(storeName3);
assertFalse(response.isError(), response.getError());
});

// Test managed clients & data cleanup
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
Expand All @@ -1381,11 +1349,19 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
Optional.of(Collections.singleton(storeName1)))) {
assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);

DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
List<Integer> partitions = new ArrayList<Integer>();
partitions.add(1);
partitions.add(2);
partitions.add(3);
partitions.add(4);
ComplementSet<Integer> subscription = ComplementSet.newSet(partitions);
storeBackend.subscribe(subscription);

DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);
client1.subscribeAll().get();
client1.unsubscribeAll();
// client2 was removed explicitly above via disableAndDeleteStore()
// client3 is expected to be removed by the factory during bootstrap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);
});
Expand Down

0 comments on commit 262bbc6

Please sign in to comment.