Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Make Helix rebalance preferences and capacity keys configurable for the controller #1475

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class ConfigConstants {
public static final long DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
TimeUnit.MINUTES.toSeconds(10);

public static final String DEFAULT_HELIX_RESOURCE_CAPACITY_KEY = "cluster_resource_weight";
kvargha marked this conversation as resolved.
Show resolved Hide resolved

/**
* End of controller config default value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2398,4 +2398,37 @@ private ConfigKeys() {
* Default: {@value com.linkedin.venice.meta.NameRepository#DEFAULT_MAXIMUM_ENTRY_COUNT}
*/
public static final String NAME_REPOSITORY_MAX_ENTRY_COUNT = "name.repository.max.entry.count";

/**
* Specifies the value to use for Helix's rebalance preference for evenness when using Waged.
* The default value is 1.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS =
"controller.helix.rebalance.preference.evenness";

/**
* Specifies the value to use for Helix's rebalance preference for less movement when using Waged.
* The default value is 1.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT =
"controller.helix.rebalance.preference.less.movement";

/**
* Indicates whether to enable force baseline convergence in Helix's rebalance preference when using Waged.
* Default is false.
*/
public static final String CONTROLLER_HELIX_REBALANCE_PREFERENCE_ENABLE_FORCE_BASELINE_CONVERGE =
"controller.helix.rebalance.preference.enable.force.baseline.converge";

/**
* Specifies the capacity a controller instance can handle, determined by
* {@link ConfigKeys#CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT}. Default is 10000.
*/
public static final String CONTROLLER_HELIX_INSTANCE_CAPACITY = "controller.helix.instance.capacity";

/**
* Specifies the weight of each Helix resource. The maximum weight per instance is determined by
* {@link ConfigKeys#CONTROLLER_HELIX_INSTANCE_CAPACITY}. Default is 100.
*/
public static final String CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT = "controller.helix.default.instance.capacity";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;
import static com.linkedin.venice.utils.TestUtils.assertCommand;
import static com.linkedin.venice.utils.TestUtils.shutdownExecutor;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
Expand All @@ -16,6 +17,8 @@
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.integration.utils.HelixAsAServiceWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand All @@ -31,6 +34,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -40,6 +44,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.PropertyKey;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
Expand Down Expand Up @@ -255,6 +260,60 @@ public void testTransitionToHAASControllerAsStorageClusterLeader() {
}
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testRebalancePreferenceAndCapacityKeys() {
try (VeniceClusterWrapper venice = ServiceFactory.getVeniceCluster(0, 0, 0, 1);
HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(venice.getZk().getAddress())) {
String controllerClusterName = "venice-controllers";

int helixRebalancePreferenceEvenness = 10;
int helixRebalancePreferenceLessMovement = 2;
boolean helixRebalancePreferenceEnableForceBaselineConverge = true;
int helixInstanceCapacity = 1000;
int helixResourceCapacityWeight = 10;

Properties clusterProperties = (Properties) enableControllerAndStorageClusterHAASProperties.clone();
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS, helixRebalancePreferenceEvenness);
clusterProperties
.put(ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT, helixRebalancePreferenceLessMovement);
clusterProperties.put(
ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_ENABLE_FORCE_BASELINE_CONVERGE,
helixRebalancePreferenceEnableForceBaselineConverge);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY, helixInstanceCapacity);
clusterProperties.put(ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT, helixResourceCapacityWeight);

VeniceControllerWrapper controllerWrapper = venice.addVeniceController(clusterProperties);

VeniceHelixAdmin veniceHelixAdmin = controllerWrapper.getVeniceHelixAdmin();

SafeHelixManager helixManager = veniceHelixAdmin.getHelixManager();
SafeHelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder propertyKeyBuilder = new PropertyKey.Builder(controllerClusterName);
ClusterConfig clusterConfig = helixDataAccessor.getProperty(propertyKeyBuilder.clusterConfig());

Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> globalRebalancePreference =
clusterConfig.getGlobalRebalancePreference();
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS),
helixRebalancePreferenceEvenness);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT),
helixRebalancePreferenceLessMovement);
assertEquals(
(int) globalRebalancePreference.get(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE),
1);

Map<String, Integer> defaultInstanceCapacityMap = clusterConfig.getDefaultInstanceCapacityMap();
assertEquals((int) defaultInstanceCapacityMap.get(DEFAULT_HELIX_RESOURCE_CAPACITY_KEY), helixInstanceCapacity);

Map<String, Integer> defaultPartitionWeightMap = clusterConfig.getDefaultPartitionWeightMap();
assertEquals(
(int) defaultPartitionWeightMap.get(DEFAULT_HELIX_RESOURCE_CAPACITY_KEY),
helixResourceCapacityWeight);
}
}

private static class InitTask implements Callable<Void> {
private final HelixAdminClient client;
private final HashMap<String, String> helixClusterProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_PACKAGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_INFO_SOURCES;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_CLOUD_PROVIDER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_INSTANCE_CAPACITY;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_ENABLE_FORCE_BASELINE_CONVERGE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_HELIX_REST_CUSTOMIZED_HEALTH_URL;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_INSTANCE_TAG_LIST;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX;
Expand Down Expand Up @@ -545,6 +550,12 @@ public class VeniceControllerClusterConfig {
private Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
private boolean isHybridStorePartitionCountUpdateEnabled;

private final int helixRebalancePreferenceEvenness;
private final int helixRebalancePreferenceLessMovement;
private final boolean helixRebalancePreferenceEnableForceBaselineConverge;
private final int helixInstanceCapacity;
private final int helixResourceCapacityWeight;

public VeniceControllerClusterConfig(VeniceProperties props) {
this.props = props;
this.clusterName = props.getString(CLUSTER_NAME);
Expand Down Expand Up @@ -996,6 +1007,13 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.pushJobUserErrorCheckpoints = parsePushJobUserErrorCheckpoints(props);
this.isHybridStorePartitionCountUpdateEnabled =
props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false);

this.helixRebalancePreferenceEvenness = props.getInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_EVENNESS, 1);
this.helixRebalancePreferenceLessMovement = props.getInt(CONTROLLER_HELIX_REBALANCE_PREFERENCE_LESS_MOVEMENT, 1);
this.helixRebalancePreferenceEnableForceBaselineConverge =
props.getBoolean(CONTROLLER_HELIX_REBALANCE_PREFERENCE_ENABLE_FORCE_BASELINE_CONVERGE, false);
this.helixInstanceCapacity = props.getInt(CONTROLLER_HELIX_INSTANCE_CAPACITY, 10000);
this.helixResourceCapacityWeight = props.getInt(CONTROLLER_HELIX_RESOURCE_CAPACITY_WEIGHT, 100);
kvargha marked this conversation as resolved.
Show resolved Hide resolved
}

public VeniceProperties getProps() {
Expand Down Expand Up @@ -1833,4 +1851,24 @@ static Set<PushJobCheckpoints> parsePushJobUserErrorCheckpoints(VeniceProperties
public Set<PushJobCheckpoints> getPushJobUserErrorCheckpoints() {
return pushJobUserErrorCheckpoints;
}

public int getHelixRebalancePreferenceEvenness() {
return helixRebalancePreferenceEvenness;
}

public int getHelixRebalancePreferenceLessMovement() {
return helixRebalancePreferenceLessMovement;
}

public boolean isHelixRebalancePreferenceForceBaselineConvergeEnabled() {
return helixRebalancePreferenceEnableForceBaselineConverge;
}

public int getHelixInstanceCapacity() {
return helixInstanceCapacity;
}

public int getHelixResourceCapacityWeight() {
return helixResourceCapacityWeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,24 @@ public long getServiceDiscoveryRegistrationRetryMS() {
public List<String> getControllerInstanceTagList() {
return getCommonConfig().getControllerInstanceTagList();
}

public int getHelixRebalancePreferenceEvenness() {
return getCommonConfig().getHelixRebalancePreferenceEvenness();
}

public int getHelixRebalancePreferenceLessMovement() {
return getCommonConfig().getHelixRebalancePreferenceLessMovement();
}

public boolean isHelixRebalancePreferenceForceBaselineConvergeEnabled() {
return getCommonConfig().isHelixRebalancePreferenceForceBaselineConvergeEnabled();
}

public int getHelixInstanceCapacity() {
return getCommonConfig().getHelixInstanceCapacity();
}

public int getHelixResourceCapacityWeight() {
return getCommonConfig().getHelixResourceCapacityWeight();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8777,4 +8777,8 @@ public void setPushJobDetailsStoreClient(AvroSpecificStoreClient<PushJobStatusRe
public PubSubTopicRepository getPubSubTopicRepository() {
return pubSubTopicRepository;
}

public SafeHelixManager getHelixManager() {
kvargha marked this conversation as resolved.
Show resolved Hide resolved
return helixManager;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.DEFAULT_HELIX_RESOURCE_CAPACITY_KEY;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceRetriableException;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.stats.ZkClientStatusStats;
import com.linkedin.venice.utils.RetryUtils;
import io.tehuti.metrics.MetricsRepository;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
Expand All @@ -21,12 +25,10 @@
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -43,6 +45,7 @@ public class ZkHelixAdminClient implements HelixAdminClient {
private static final String CONTROLLER_HAAS_ZK_CLIENT_NAME = "controller-zk-client-for-haas-admin";

private final HelixAdmin helixAdmin;
private final ConfigAccessor helixConfigAccessor;
private final VeniceControllerClusterConfig commonConfig;
private final VeniceControllerMultiClusterConfig multiClusterConfigs;
private final String haasSuperClusterName;
Expand All @@ -65,6 +68,7 @@ public ZkHelixAdminClient(
throw new VeniceException("Failed to connect to ZK within " + ZkClient.DEFAULT_CONNECTION_TIMEOUT + " ms!");
}
helixAdmin = new ZKHelixAdmin(helixAdminZkClient);
helixConfigAccessor = new ConfigAccessor(helixAdminZkClient);
}

/**
Expand Down Expand Up @@ -98,6 +102,39 @@ public void createVeniceControllerCluster() {
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
clusterConfig.setTopologyAwareEnabled(false);
clusterConfig.setPersistBestPossibleAssignment(true);

// We want to prioritize evenness over less movement when it comes to resource assignment, because the cost
// of rebalancing for the controller is cheap as it is stateless.
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> globalRebalancePreference = new HashMap<>();
globalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS,
commonConfig.getHelixRebalancePreferenceEvenness());
globalRebalancePreference.put(
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
commonConfig.getHelixRebalancePreferenceLessMovement());
// This should be turned off, so it doesn't overpower other constraint calculations
int forceBaseLineConverge = commonConfig.isHelixRebalancePreferenceForceBaselineConvergeEnabled() ? 1 : 0;
globalRebalancePreference
.put(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, forceBaseLineConverge);
clusterConfig.setGlobalRebalancePreference(globalRebalancePreference);

List<String> instanceCapacityKeys = new ArrayList<>();
instanceCapacityKeys.add(DEFAULT_HELIX_RESOURCE_CAPACITY_KEY);
clusterConfig.setInstanceCapacityKeys(instanceCapacityKeys);

// This is how much capacity a participant can take. The Helix documentation recommends setting this to a high
// value to avoid rebalance failures. The primary goal of setting this is to enable a constraint that takes the
// current top-state distribution into account when rebalancing.
Map<String, Integer> defaultInstanceCapacityMap = new HashMap<>();
defaultInstanceCapacityMap.put(DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, commonConfig.getHelixInstanceCapacity());
clusterConfig.setDefaultInstanceCapacityMap(defaultInstanceCapacityMap);

// This is how much weight each resource in a cluster has
Map<String, Integer> defaultPartitionWeightMap = new HashMap<>();
defaultPartitionWeightMap
.put(DEFAULT_HELIX_RESOURCE_CAPACITY_KEY, commonConfig.getHelixResourceCapacityWeight());
clusterConfig.setDefaultPartitionWeightMap(defaultPartitionWeightMap);

updateClusterConfigs(controllerClusterName, clusterConfig);
helixAdmin.addStateModelDef(controllerClusterName, LeaderStandbySMD.name, LeaderStandbySMD.build());
Expand Down Expand Up @@ -215,21 +252,15 @@ public void addClusterToGrandCluster(String clusterName) {
*/
@Override
public void updateClusterConfigs(String clusterName, ClusterConfig clusterConfig) {
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>(clusterConfig.getRecord().getSimpleFields());
helixAdmin.setConfig(configScope, helixClusterProperties);
helixConfigAccessor.setClusterConfig(clusterName, clusterConfig);
}

/**
* @see HelixAdminClient#updateRESTConfigs(String, RESTConfig)
*/
@Override
public void updateRESTConfigs(String clusterName, RESTConfig restConfig) {
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.REST).forCluster(clusterName).build();
Map<String, String> helixRestProperties = new HashMap<>(restConfig.getRecord().getSimpleFields());
helixAdmin.setConfig(configScope, helixRestProperties);
helixConfigAccessor.setRESTConfig(clusterName, restConfig);
}
kvargha marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand Down
Loading
Loading