Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] log compaction #1282

Draft
wants to merge 58 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ff3c1ae
[controller] log compaction
Oct 29, 2024
6a7903d
[controller][server] complete log compaction candidate selection help…
Oct 30, 2024
62fcf64
[controller][server][style] put log compaction criterion on newline f…
Oct 30, 2024
189ed58
[controller][server][documentation] VeniceParentHelixAdmin::getStores…
Oct 30, 2024
8101408
[controller][server][documentation] VeniceParentHelixAdmin::isCompact…
Oct 31, 2024
8160e5b
[controller][server][documentation] filter out stores where current v…
Oct 31, 2024
c28728c
[controller][server][unit test] Unit test for VeniceParentHelixAdmin:…
Nov 1, 2024
5e481cb
[controller][server][unit test] move testFilterStoresForCompaction to…
Nov 1, 2024
2a0f3f2
[controller][server][unit test][style] switch actual & expected value…
Nov 1, 2024
f6ca080
[controller][server][unit test][style] switch actual & expected value…
Nov 4, 2024
bf73b48
[controller][server][style][PR] make VeniceParentHelixAdmin::isCompac…
Nov 6, 2024
77d83c0
[controller][server][style][PR] revert wildcard import
Nov 6, 2024
af12106
[controller][server][style][PR] delete wildcard import
Nov 6, 2024
e82e659
[controller][server][PR] change Admin::getStoresForCompaction return …
Nov 6, 2024
373c208
[controller][server][PR]
Nov 7, 2024
2b11f9d
[controller]
Nov 21, 2024
1a7f015
[controller] create RepushOrchestratorProvider
Nov 21, 2024
1df10d9
[controller] create LogCompactionService & initialise instance in Ven…
Nov 21, 2024
4652dc9
[controller][PR] change ArrayList to List datatype in contracts
Nov 21, 2024
da5455c
[controller][PR] move VeniceParentHelixAdmin::getStoresForCompaction …
Nov 22, 2024
f30f705
[controller] add triggerRepush endpoint
Nov 22, 2024
24c97a1
[controller] change repush terminology to compactStore
Nov 22, 2024
3991b22
[controller] create LogCompactionService
Nov 23, 2024
43f5c40
[controller][config] add SCHEDULED_LOG_COMPACTION_THREAD_COUNT config
Nov 23, 2024
f638e40
[controller][config][style] delete SCHEDULED_LOG_COMPACTION_THREAD_CO…
Nov 23, 2024
2f3966d
[controller] Refactor VeniceHelixAdmin::getStoresForCompaction logic …
Nov 25, 2024
8655b96
[controller] add scheduled.log.compaction.interval.HR config for exec…
Nov 25, 2024
1f44e45
[controller][config] change schedule.log.compaction.interval.ms confi…
Nov 26, 2024
e0bd4c9
[controller][test] TestLogCompactionService::testScheduledExecution
Nov 26, 2024
275b4ec
[controller][config] change schedule.log.compaction.interval.ms confi…
Nov 26, 2024
cf6861b
[controller][config] create RepushOrchestrator interface
Nov 26, 2024
201631d
[controller][PR]
Nov 27, 2024
bcdb3da
[controller] return type ArrayList > List
Dec 5, 2024
2c65d3c
[controller][PR] revert wildcard import
Dec 6, 2024
9a9cc1d
[controller][PR] prepend DEFAULT to public static final field in Comp…
Dec 6, 2024
5457c79
[controller][PR] move logcompaction tests to same package
Dec 6, 2024
083af27
test commit after renaming branch from `log-compaction` to `whdeng/lo…
Dec 6, 2024
dc0835e
test commit after reverting branch name from `whdeng/log-compaction-r…
Dec 6, 2024
a9a5aeb
[controller][PR] add and update docs and logs
Dec 6, 2024
5ac6468
[controller][PR] revert wildcard imports
Dec 6, 2024
c4dc3de
[controller][PR] add VeniceControllerClusterConfig::isLogCompactionEn…
Dec 9, 2024
7a851d9
[controller][PR] add VeniceControllerMultiClusterConfig::isLogCompact…
Dec 9, 2024
b93c6fc
[controller] add LogCompactionTask::triggerSource
Dec 9, 2024
5a5dddb
[controller][PR] in ControllerClient::getStoresForCompaction, modify …
Dec 10, 2024
3f641f5
[controller][PR] add ControllerClient::triggerRepush TODO
Dec 10, 2024
98171a5
[controller] repush job response handling flow
Dec 11, 2024
8253c82
[controller] add/modify RepushJobResponse fields
Dec 12, 2024
7eec4be
[controller] catch unhandled Throwable
Dec 12, 2024
759c2ae
[controller] VeniceHelixAdmin: only initialise RepushOrchestrator & C…
Dec 12, 2024
8a9dc9f
[controller] setup test controller in TestLogCompactionService
Dec 13, 2024
87294ca
[controller] add config for time since last log compaction threshold
Dec 17, 2024
e6d2bb4
[controller] ConfigKeys::TIME_SINCE_LAST_LOG_COMPACTION_THRESHOLD_MS …
Dec 17, 2024
ec5acaa
[controller][PR] lambda function instead of Optional
Dec 18, 2024
8a321ca
[controller] added integration test for scheduled log compaction
Dec 19, 2024
c5376ca
[controller] incomplete integration test for log compaction in TestLo…
Dec 19, 2024
3a677e5
[controller] todo to extract TestHybrid::testHybridStoreLogCompaction…
Dec 19, 2024
0e205c8
[controller] pass repush orchestrator configs as VeniceProperties
Jan 27, 2025
1ddc155
[format] comment spacing
Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,16 @@ public MultiStoreInfoResponse getClusterStores(String clusterName) {
return request(ControllerRoute.GET_STORES_IN_CLUSTER, params, MultiStoreInfoResponse.class);
}

/**
* This method gets a list of store names that are ready for compaction.
* @param clusterName, the name of the cluster to query for compaction eligible stores
* @return The list of store names that are ready for compaction.
*/
public MultiStoreInfoResponse getStoresForCompaction(String clusterName) {
QueryParams params = newParams().add(CLUSTER, clusterName);
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved
return request(ControllerRoute.GET_STORES_FOR_COMPACTION, params, MultiStoreInfoResponse.class);
}

public VersionResponse getStoreLargestUsedVersion(String clusterName, String storeName) {
QueryParams params = newParams().add(CLUSTER, clusterName).add(NAME, storeName);
return request(ControllerRoute.GET_STORE_LARGEST_USED_VERSION, params, VersionResponse.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public enum ControllerRoute {
Arrays.asList(CLUSTER, SOURCE_FABRIC, FABRIC, NAME, VERSION), AMPLIFICATION_FACTOR
), GET_STALE_STORES_IN_CLUSTER("/get_stale_stores_in_cluster", HttpMethod.GET, Collections.singletonList(CLUSTER)),
GET_STORES_IN_CLUSTER("/get_stores_in_cluster", HttpMethod.GET, Collections.singletonList(CLUSTER)),
GET_STORES_FOR_COMPACTION("/get_stores_for_compaction", HttpMethod.GET, Collections.singletonList(CLUSTER)),
GET_STORE_LARGEST_USED_VERSION("/get_store_largest_used_version", HttpMethod.GET, Arrays.asList(CLUSTER, NAME)),
LIST_STORE_PUSH_INFO("/list_store_push_info", HttpMethod.GET, Arrays.asList(CLUSTER, NAME, PARTITION_DETAIL_ENABLED)),
GET_REGION_PUSH_DETAILS(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,6 @@ private void testLatestSupersetSchemaIdUpdate(
});
}
}

}

private void testSuperSetSchemaGen(ControllerClient parentControllerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@


public interface Admin extends AutoCloseable, Closeable {

// Wrapper to include both overall offline push status and other extra useful info
class OfflinePushStatusInfo {
private ExecutionStatus executionStatus;
Expand Down Expand Up @@ -935,6 +936,12 @@ default boolean isAdminTopicConsumptionEnabled(String clusterName) {

Map<String, StoreDataAudit> getClusterStaleStores(String clusterName);

/**
* @param clusterName
* @return the list of stores ready for compaction
*/
List<StoreInfo> getStoresForCompaction(String clusterName);

/**
* @return the largest used version number for the given store from store graveyard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7467,6 +7467,11 @@ public Map<String, StoreDataAudit> getClusterStaleStores(String clusterName) {
throw new UnsupportedOperationException("This function has not been implemented.");
}

@Override
public ArrayList<StoreInfo> getStoresForCompaction(String clusterName) {
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException("This function has no implementation.");
}

@Override
public Map<String, RegionPushDetails> listStorePushInfo(
String clusterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ public class VeniceParentHelixAdmin implements Admin {
private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0];

private static final long TOPIC_DELETION_DELAY_MS = 5 * Time.MS_PER_MINUTE;
public static final int COMPACTION_THRESHOLD_HOURS = 24;

final Map<String, Boolean> asyncSetupEnabledMap;
private final VeniceHelixAdmin veniceHelixAdmin;
Expand Down Expand Up @@ -4801,6 +4802,78 @@ public Map<String, StoreDataAudit> getClusterStaleStores(String clusterName) {
return retMap;
}

/**
* This function iterates over all of Helix Parent Admin's child controllers,
* in order to obtain the stores and respective store information of each child controller,
* and then filter out the stores that are ready for compaction.
* @param clusterName, the name of the cluster to search for stores that are ready for compaction
* @return a list of StoreInfo objects of stores that are ready for compaction
*/
@Override
public ArrayList<StoreInfo> getStoresForCompaction(String clusterName) {
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved
ArrayList<StoreInfo> compactionReadyStores = new ArrayList<>();
try {
Map<String, ControllerClient> childControllers = getVeniceHelixAdmin().getControllerClientMap(clusterName);
ArrayList<StoreInfo> storeInfoList = new ArrayList<>();

// iterate through child controllers
for (Map.Entry<String, ControllerClient> controller: childControllers.entrySet()) {

// add all store info to storeInfoList
MultiStoreInfoResponse response = controller.getValue().getClusterStores(clusterName);
storeInfoList.addAll(response.getStoreInfoList());
}

// filter out
filterStoresForCompaction(storeInfoList, compactionReadyStores);
} catch (Exception e) {
throw new VeniceException("Something went wrong trying to fetch stores for compaction.", e);
}
return compactionReadyStores;
}

// package exclusive for testing
void filterStoresForCompaction(ArrayList<StoreInfo> storeInfoList, ArrayList<StoreInfo> compactionReadyStores) {
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved
for (StoreInfo storeInfo: storeInfoList) {
if (isCompactionReady(storeInfo)) {
compactionReadyStores.add(storeInfo);
}
}
}

// This function abstracts the criteria for a store to be ready for compaction
boolean isCompactionReady(StoreInfo storeInfo) {
boolean isHybridStore = storeInfo.getHybridStoreConfig() != null;

return isHybridStore && isLastCompactionTimeOlderThanThresholdHours(COMPACTION_THRESHOLD_HOURS, storeInfo);
}

// START isCompactionReady() helper methods: each method below encapsulates a log compaction readiness criterion
/**
* This function checks if the last compaction time is older than the threshold.
* @param compactionThresholdHours, the number of hours that the last compaction time should be older than
* @param storeInfo, the store to check the last compaction time for
* @return true if the last compaction time is older than the threshold, false otherwise
*/
private boolean isLastCompactionTimeOlderThanThresholdHours(int compactionThresholdHours, StoreInfo storeInfo) {
// get the last compaction time
int currentVersionNumber = storeInfo.getCurrentVersion();
Optional<Version> currentVersion = storeInfo.getVersion(currentVersionNumber);
if (!currentVersion.isPresent()) {
LOGGER.warn("Couldn't find current version: {} from store: {}", currentVersionNumber, storeInfo.getName());
return false; // invalid store because no current version, this store is not eligible for compaction
}

// calculate hours since last compaction
long lastCompactionTime = currentVersion.get().getCreatedTime();
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved
long currentTime = System.currentTimeMillis();
long millisecondsSinceLastCompaction = currentTime - lastCompactionTime;
long hoursSinceLastCompaction = TimeUnit.MILLISECONDS.toHours(millisecondsSinceLastCompaction);

return hoursSinceLastCompaction > compactionThresholdHours;
}
// END isCompactionReady() helper methods

/**
* @return the largest used version number for the given store from the store graveyard.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STALE_STORES_IN_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORAGE_PERSONA;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORAGE_PERSONA_ASSOCIATED_WITH_STORE;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORES_FOR_COMPACTION;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORES_IN_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORE_LARGEST_USED_VERSION;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_VALUE_OR_DERIVED_SCHEMA_ID;
Expand Down Expand Up @@ -572,6 +573,9 @@ public boolean startInner() throws Exception {
httpService.get(
GET_STORES_IN_CLUSTER.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getStoresInCluster(admin)));
httpService.get(
GET_STORES_FOR_COMPACTION.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getStoresForCompaction(admin)));
httpService.get(
GET_STORE_LARGEST_USED_VERSION.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.getStoreLargestUsedVersion(admin)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_REGION_PUSH_DETAILS;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_REPUSH_INFO;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STALE_STORES_IN_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORES_FOR_COMPACTION;
import static com.linkedin.venice.controllerapi.ControllerRoute.GET_STORES_IN_CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_STORES;
import static com.linkedin.venice.controllerapi.ControllerRoute.LIST_STORE_PUSH_INFO;
Expand Down Expand Up @@ -937,6 +938,22 @@ public void internalHandle(Request request, MultiStoreInfoResponse veniceRespons
};
}

/**
* @see Admin#getStoresForCompaction(String)
*/
public Route getStoresForCompaction(Admin admin) {
return new VeniceRouteHandler<MultiStoreInfoResponse>(MultiStoreInfoResponse.class) {
@Override
public void internalHandle(Request request, MultiStoreInfoResponse veniceResponse) {
AdminSparkServer.validateParams(request, GET_STORES_FOR_COMPACTION.getParams(), admin);
String cluster = request.queryParams(CLUSTER);
ArrayList<StoreInfo> response = admin.getStoresForCompaction(cluster);
veniceResponse.setStoreInfoList(response);
veniceResponse.setCluster(cluster);
}
};
}

/**
* @see Admin#getLargestUsedVersionFromStoreGraveyard(String, String)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.venice.helix.HelixReadWriteStoreRepository;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PersistenceType;
Expand Down Expand Up @@ -2835,4 +2836,65 @@ public void testGetFinalReturnStatus() {
finalStatus = VeniceParentHelixAdmin.getFinalReturnStatus(statuses, childRegions, 0, new StringBuilder());
assertEquals(finalStatus, ExecutionStatus.DVC_INGESTION_ERROR_OTHER);
}

@Test
public void testFilterStoresForCompaction() {
VeniceParentHelixAdmin mockAdmin = mock(VeniceParentHelixAdmin.class);
ArrayList<StoreInfo> storeInfoList = new ArrayList<>();
ArrayList<StoreInfo> compactionReadyStores = new ArrayList<>();

// Mock StoreInfo instances
StoreInfo store1 = mock(StoreInfo.class);
StoreInfo store2 = mock(StoreInfo.class);
StoreInfo store3 = mock(StoreInfo.class);

// Mock HybridStoreConfig for the first two StoreInfo instances
HybridStoreConfig hybridStoreConfig1 = mock(HybridStoreConfig.class);
HybridStoreConfig hybridStoreConfig2 = mock(HybridStoreConfig.class);
when(store1.getHybridStoreConfig()).thenReturn(hybridStoreConfig1);
when(store2.getHybridStoreConfig()).thenReturn(hybridStoreConfig2);
when(store3.getHybridStoreConfig()).thenReturn(null);

// Mock version numbers with random numbers
int store1VersionNumber = 1;
int store2VersionNumber = 2;
int store3VersionNumber = 3;
when(store1.getCurrentVersion()).thenReturn(store1VersionNumber);
when(store2.getCurrentVersion()).thenReturn(store2VersionNumber);
when(store3.getCurrentVersion()).thenReturn(store3VersionNumber);

// Mock Version instances
Version version1 = mock(Version.class);
Version version2 = mock(Version.class);
Version version3 = mock(Version.class);

// Return Version mocks when getVersion() is called
when(store1.getVersion(anyInt())).thenReturn(Optional.ofNullable(version1));
when(store2.getVersion(anyInt())).thenReturn(Optional.ofNullable(version2));
when(store3.getVersion(anyInt())).thenReturn(Optional.ofNullable(version3));

// Set createTime for Version mocks
long currentTime = System.currentTimeMillis();
long millisecondsPerHour = 60 * 60 * 1000;
when(version1.getCreatedTime()).thenReturn(currentTime - (25 * millisecondsPerHour)); // 25 hours ago
when(version2.getCreatedTime()).thenReturn(currentTime - (50 * millisecondsPerHour)); // 50 hours ago
when(version3.getCreatedTime()).thenReturn(currentTime - (23 * millisecondsPerHour)); // 23 hours ago
WhitneyDeng marked this conversation as resolved.
Show resolved Hide resolved

// Add StoreInfo instances to the list
storeInfoList.add(store1);
storeInfoList.add(store2);
storeInfoList.add(store3);

// Call the real method to test
doCallRealMethod().when(mockAdmin).filterStoresForCompaction(any(), any());

// Test
mockAdmin.filterStoresForCompaction(storeInfoList, compactionReadyStores);

// Assert the expected outcome
assertEquals(compactionReadyStores.size(), 2);
assertEquals(compactionReadyStores.contains(store1), true);
assertEquals(compactionReadyStores.contains(store2), true);
assertEquals(compactionReadyStores.contains(store3), false);
}
}