Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add health check to route queries to healthy cluster #24449

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions presto-password-authenticators/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>5.1.0</version>
</dependency>

<dependency>
Expand Down
32 changes: 26 additions & 6 deletions presto-router/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
Expand Down Expand Up @@ -57,6 +62,11 @@
<artifactId>http-server</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>jaxrs</artifactId>
Expand Down Expand Up @@ -169,12 +179,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-tpch</artifactId>
Expand All @@ -193,4 +197,20 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
<configuration>
<ignorePackages>
<ignorePackage>com.facebook.presto.testing.assertions</ignorePackage>
</ignorePackages>
<ignoreClassNamePatterns>
<ignoreClassNamePattern>com.facebook.presto.router.MockRouterHttpServletRequest</ignoreClassNamePattern>
</ignoreClassNamePatterns>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,35 @@
package com.facebook.presto.router;

import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.router.cluster.ClusterManager;
import com.facebook.presto.router.cluster.ClusterStatusResource;
import com.facebook.presto.router.cluster.ClusterStatusTracker;
import com.facebook.presto.router.cluster.ForClusterInfoTracker;
import com.facebook.presto.router.cluster.ForClusterManager;
import com.facebook.presto.router.cluster.ForQueryInfoTracker;
import com.facebook.presto.router.cluster.RemoteInfoFactory;
import com.facebook.presto.router.cluster.RemoteStateConfig;
import com.facebook.presto.router.predictor.ForQueryCpuPredictor;
import com.facebook.presto.router.predictor.ForQueryMemoryPredictor;
import com.facebook.presto.router.predictor.PredictorManager;
import com.facebook.presto.router.predictor.RemoteQueryFactory;
import com.facebook.presto.server.PluginManagerConfig;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.server.WebUiResource;
import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.units.Duration;

import java.lang.annotation.Annotation;
import java.util.concurrent.ScheduledExecutorService;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder;
import static com.facebook.airlift.http.server.HttpServerBinder.httpServerBinder;
import static com.facebook.airlift.jaxrs.JaxrsBinder.jaxrsBinder;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;

public class RouterModule
Expand All @@ -52,20 +61,36 @@ public class RouterModule
@Override
protected void setup(Binder binder)
{
ServerConfig serverConfig = buildConfigObject(ServerConfig.class);

httpServerBinder(binder).bindResource(UI_PATH, ROUTER_UI).withWelcomeFile(INDEX_HTML);
configBinder(binder).bindConfig(RouterConfig.class);

configBinder(binder).bindConfig(RemoteStateConfig.class);
configBinder(binder).bindConfigDefaults(RemoteStateConfig.class, config -> config.setSecondsToUnhealthy(30));

// resource for serving static content
jaxrsBinder(binder).bind(WebUiResource.class);

binder.bind(ScheduledExecutorService.class).annotatedWith(ForClusterManager.class).toInstance(newSingleThreadScheduledExecutor(threadsNamed("cluster-config")));

binder.bind(ClusterManager.class).in(Scopes.SINGLETON);
binder.bind(RemoteInfoFactory.class).in(Scopes.SINGLETON);

bindHttpClient(binder, QUERY_TRACKER, ForQueryInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);
bindHttpClient(binder, QUERY_TRACKER, ForClusterInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);

//Determine the NodeVersion
NodeVersion nodeVersion = new NodeVersion(serverConfig.getPrestoVersion());
binder.bind(NodeVersion.class).toInstance(nodeVersion);

binder.bind(ClusterStatusTracker.class).in(Scopes.SINGLETON);

binder.bind(PredictorManager.class).in(Scopes.SINGLETON);
binder.bind(RemoteQueryFactory.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(PluginManagerConfig.class);

bindHttpClient(binder, QUERY_PREDICTOR, ForQueryCpuPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryMemoryPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package com.facebook.presto.router;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.router.cluster.ClusterManager;
import com.facebook.presto.router.cluster.RequestInfo;
import com.google.inject.Inject;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.security.PermitAll;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -35,13 +37,13 @@
import static javax.ws.rs.core.MediaType.TEXT_PLAIN_TYPE;
import static javax.ws.rs.core.Response.Status.BAD_GATEWAY;

@Path("/")
@PermitAll
@Path("/v1")
public class RouterResource
{
private static final Logger log = Logger.get(RouterResource.class);

private final ClusterManager clusterManager;
private static final CounterStat successRedirectRequests = new CounterStat();
private static final CounterStat failedRedirectRequests = new CounterStat();

@Inject
public RouterResource(ClusterManager clusterManager)
Expand All @@ -50,23 +52,39 @@ public RouterResource(ClusterManager clusterManager)
}

@POST
@Path("/v1/statement")
@Path("statement")
@Produces(APPLICATION_JSON)
public Response routeQuery(String statement, @Context HttpServletRequest servletRequest)
{
RequestInfo requestInfo = new RequestInfo(servletRequest, statement);
URI coordinatorUri = clusterManager.getDestination(requestInfo).orElseThrow(() -> badRequest(BAD_GATEWAY, "No Presto cluster available"));
URI statementUri = uriBuilderFrom(coordinatorUri).replacePath("/v1/statement").build();
successRedirectRequests.update(1);
log.info("route query to %s", statementUri);
return Response.temporaryRedirect(statementUri).build();
}

private static WebApplicationException badRequest(Response.Status status, String message)
{
failedRedirectRequests.update(1);
throw new WebApplicationException(
Response.status(status)
.type(TEXT_PLAIN_TYPE)
.entity(message)
.build());
}

@Managed
@Nested
public CounterStat getFailedRedirectRequests()
{
return failedRedirectRequests;
}

@Managed
@Nested
public CounterStat getSuccessRedirectRequests()
{
return successRedirectRequests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.router.cluster;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.router.RouterConfig;
import com.facebook.presto.router.scheduler.Scheduler;
import com.facebook.presto.router.scheduler.SchedulerFactory;
Expand All @@ -24,41 +25,108 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import javax.annotation.PostConstruct;
import javax.inject.Inject;

import java.io.File;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.facebook.presto.router.RouterUtil.parseRouterConfig;
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_RANDOM_CHOICE;
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_ROUND_ROBIN;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

public class ClusterManager
{
private final Map<String, GroupSpec> groups;
private final List<SelectorRuleSpec> groupSelectors;
private final SchedulerType schedulerType;
private final Scheduler scheduler;
private final HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
private Map<String, GroupSpec> groups;
private List<SelectorRuleSpec> groupSelectors;
private SchedulerType schedulerType;
private Scheduler scheduler;
private HashMap<String, HashMap<URI, Integer>> serverWeights = new HashMap<>();
private HashMap<URI, URI> discoveryURIs = new HashMap<>();
private final RouterConfig routerConfig;
private final ScheduledExecutorService scheduledExecutorService;
private final AtomicLong lastConfigUpdate = new AtomicLong();
private final RemoteInfoFactory remoteInfoFactory;
private final Logger log = Logger.get(ClusterManager.class);

// Cluster status
private final ConcurrentHashMap<URI, RemoteClusterInfo> remoteClusterInfos = new ConcurrentHashMap<>();
private final ConcurrentHashMap<URI, RemoteQueryInfo> remoteQueryInfos = new ConcurrentHashMap<>();

@Inject
public ClusterManager(RouterConfig config)
public ClusterManager(RouterConfig config, @ForClusterManager ScheduledExecutorService scheduledExecutorService, RemoteInfoFactory remoteInfoFactory)
{
this.routerConfig = config;
this.scheduledExecutorService = scheduledExecutorService;
RouterSpec routerSpec = parseRouterConfig(config)
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));

this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
this.schedulerType = routerSpec.getSchedulerType();
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();

this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null");
this.initializeServerWeights();
this.initializeMembersDiscoveryURI();
List<URI> allClusters = getAllClusters();
allClusters.forEach(uri -> {
log.info("Attaching cluster %s to the router", uri.getHost());
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(discoveryURIs.get(uri)));
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(discoveryURIs.get(uri)));
log.info("Successfully attached cluster %s to the router. Queries will be routed to cluster after successful health check", uri.getHost());
});
}

@PostConstruct
public void startConfigReloadTask()
{
File routerConfigFile = new File(routerConfig.getConfigFile());
//ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
long newConfigUpdateTime = routerConfigFile.lastModified();
if (lastConfigUpdate.get() != newConfigUpdateTime) {
RouterSpec routerSpec = parseRouterConfig(routerConfig)
.orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
this.schedulerType = routerSpec.getSchedulerType();
this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
this.initializeServerWeights();
this.initializeMembersDiscoveryURI();
List<URI> allClusters = getAllClusters();
allClusters.forEach(uri -> {
if (!remoteClusterInfos.containsKey(uri)) {
log.info("Attaching cluster %s to the router", uri.getHost());
remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(discoveryURIs.get(uri)));
remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(discoveryURIs.get(uri)));
log.info("Successfully attached cluster %s to the router. Queries will be routed to cluster after successful health check", uri.getHost());
}
});
for (URI uri : remoteClusterInfos.keySet()) {
if (!allClusters.contains(uri)) {
log.info("Removing cluster %s from the router", uri.getHost());
remoteClusterInfos.remove(uri);
remoteQueryInfos.remove(uri);
discoveryURIs.remove(uri);
log.info("Successfully removed cluster %s from the router", uri.getHost());
}
}
lastConfigUpdate.set(newConfigUpdateTime);
}
}, 0L, (long) 5, TimeUnit.SECONDS);
}

public List<URI> getAllClusters()
Expand All @@ -77,11 +145,20 @@ public Optional<URI> getDestination(RequestInfo requestInfo)

checkArgument(groups.containsKey(target.get()));
GroupSpec groupSpec = groups.get(target.get());
scheduler.setCandidates(groupSpec.getMembers());
if (schedulerType == WEIGHTED_RANDOM_CHOICE) {
scheduler.setWeights(serverWeights.get(groupSpec.getName()));

List<URI> healthyClusterURIs = groupSpec.getMembers().stream()
.filter(entry -> remoteClusterInfos.get(entry).isHealthy())
.collect(Collectors.toList());

if (healthyClusterURIs.isEmpty()) {
log.info("Healthy cluster not found!");
return Optional.empty();
}

scheduler.setCandidates(healthyClusterURIs);
if (schedulerType == WEIGHTED_RANDOM_CHOICE || schedulerType == WEIGHTED_ROUND_ROBIN) {
scheduler.setWeights(serverWeights.get(groupSpec.getName()));
}
return scheduler.getDestination(requestInfo.getUser());
}

Expand All @@ -105,4 +182,25 @@ private void initializeServerWeights()
}
});
}

private void initializeMembersDiscoveryURI()
{
groups.forEach((name, groupSpec) -> {
List<URI> members = groupSpec.getMembers();
List<URI> membersDiscoveryURI = groupSpec.getMembersDiscoveryURI();
for (int i = 0; i < members.size(); i++) {
discoveryURIs.put(members.get(i), membersDiscoveryURI.get(i));
}
});
}

public ConcurrentHashMap<URI, RemoteClusterInfo> getRemoteClusterInfos()
{
return remoteClusterInfos;
}

public ConcurrentHashMap<URI, RemoteQueryInfo> getRemoteQueryInfos()
{
return remoteQueryInfos;
}
}
Loading
Loading