diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java b/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java
index bce2dedf8b65a..7c3f11aac5b13 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java
@@ -26,7 +26,7 @@
import static java.util.Objects.requireNonNull;
-class PluginClassLoader
+public class PluginClassLoader
extends URLClassLoader
{
private static final ClassLoader PLATFORM_CLASS_LOADER = findPlatformClassLoader();
diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java
index 6c0d1f4fe9ef1..347eb34703b85 100644
--- a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java
+++ b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java
@@ -38,7 +38,7 @@
import static java.nio.file.Files.walkFileTree;
// This is a hack for development and does not support nested classes.
-final class PluginDiscovery
+public final class PluginDiscovery
{
private static final String CLASS_FILE_SUFFIX = ".class";
diff --git a/presto-router-custom-scheduler/README.txt b/presto-router-custom-scheduler/README.txt
new file mode 100755
index 0000000000000..a536426d0a8bb
--- /dev/null
+++ b/presto-router-custom-scheduler/README.txt
@@ -0,0 +1,17 @@
+# Presto Router Custom Scheduler Plugin
+This package implements third party custom schedulers to be used by Presto router
+
+## Adding the custom scheduler plugin to presto router
+The custom plugin jar that is built is placed in the presto-router/plugin/custom-scheduler directory,
+to be picked up by the presto-router
+The scheduler name has to be set to CUSTOM_PLUGIN_SCHEDULER in the router-config.json
+ "scheduler": "CUSTOM_PLUGIN_SCHEDULER"
+The name of the custom scheduler factory needs to be set in the property file router-scheduler.properties in presto-router/etc/router-config
+
+## Main Classes:
+RouterSchedulerPlugin - Custom Scheduler Plugin class to be loaded by the Router plugin manager.
+ This class implements the interface com.facebook.presto.spi.RouterPlugin.
+MetricsBasedSchedulerFactory - Factory for creating specific custom scheduler
+ This class implements the interface com.facebook.presto.spi.SchedulerFactory
+MetricsBasedScheduler - Custom scheduler implementing the scheduling logic for clusters. More similar classes can be added implementing specific custom scheduling logic.
+ This class implements the interface com.facebook.presto.spi.router.Scheduler
\ No newline at end of file
diff --git a/presto-router-custom-scheduler/pom.xml b/presto-router-custom-scheduler/pom.xml
new file mode 100644
index 0000000000000..df5884ad3411c
--- /dev/null
+++ b/presto-router-custom-scheduler/pom.xml
@@ -0,0 +1,86 @@
+
+
+ 4.0.0
+
+
+ com.facebook.presto
+ presto-root
+ 0.291-SNAPSHOT
+
+
+ com.facebook.presto.router
+ scheduler
+ 0.291-SNAPSHOT
+ jar
+ scheduler
+ SPI plugin project for presto router custom scheduler
+
+
+
+ com.facebook.presto
+ presto-spi
+
+
+
+ com.facebook.airlift
+ log
+ 0.215
+
+
+
+
+ org.testng
+ testng
+ 7.10.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ 5.9.1
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+ 11
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.3.0
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+ jar-with-dependencies
+
+
+
+ true
+ com.facebook.presto.router.scheduler.RouterSchedulerPlugin
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java
new file mode 100644
index 0000000000000..fe309dce13eec
--- /dev/null
+++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+public class CustomSchedulerConstants
+{
+ private CustomSchedulerConstants()
+ {
+ }
+
+ public static final String METRICS_BASED = "metricsBased";
+}
diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java
new file mode 100644
index 0000000000000..1d0f7538c1ef8
--- /dev/null
+++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.router.ClusterInfo;
+import com.facebook.presto.spi.router.Scheduler;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+//Metrics based scheduler uses coordinator and/or worker metrics for scheduling decisions
+public class MetricsBasedScheduler
+ implements Scheduler
+{
+ private static final Logger log = Logger.get(MetricsBasedScheduler.class);
+ Map clusterInfos;
+
+ //Min heap based priority queue. Cluster with lowest number of queries will be on top of the queue
+ PriorityQueue clusterQueue = new PriorityQueue((x, y) -> (int) (x.totalQueries - y.totalQueries));
+
+ private List candidates;
+
+ private class ClusterQueryInfo
+ {
+ URI clusterUri;
+ long runningQueries;
+ long queuedQueries;
+ long totalQueries;
+
+ ClusterQueryInfo(URI clusterUri, long runningQueries, long queuedQueries)
+ {
+ this.clusterUri = clusterUri;
+ this.runningQueries = runningQueries;
+ this.queuedQueries = queuedQueries;
+ totalQueries = (long) (runningQueries + queuedQueries);
+ log.info("Cluster URI : " + clusterUri + ", runningQueries : " + runningQueries + ", queuedQueries : " + queuedQueries + ", totalQueries : " + totalQueries);
+ }
+ }
+
+ @Override
+ public Optional getDestination(String user, String query)
+ {
+ try {
+ if (clusterInfos != null && clusterInfos.size() > 0) {
+ clusterQueue.clear();
+ clusterQueue.addAll(clusterInfos.keySet().stream()
+ .map(uri -> new ClusterQueryInfo(uri, clusterInfos.get(uri).getRunningQueries(), clusterInfos.get(uri).getQueuedQueries()))
+ .collect(Collectors.toList()));
+ return Optional.of(clusterQueue.poll().clusterUri);
+ }
+ return Optional.empty();
+ }
+ catch (IllegalArgumentException e) {
+ log.warn(e, "Error getting destination for user " + user);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void setCandidates(List candidates)
+ {
+ this.candidates = candidates;
+ }
+
+ @Override
+ public void setClusterInfos(Map clusterInfos)
+ {
+ this.clusterInfos = clusterInfos;
+ }
+}
diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java
new file mode 100644
index 0000000000000..314fd827b8a80
--- /dev/null
+++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.presto.spi.router.Scheduler;
+import com.facebook.presto.spi.router.SchedulerFactory;
+
+import java.util.Map;
+
+public class MetricsBasedSchedulerFactory
+ implements SchedulerFactory
+{
+ @Override
+ public String getName()
+ {
+ return CustomSchedulerConstants.METRICS_BASED;
+ }
+
+ @Override
+ public Scheduler create(Map config)
+ {
+ return new MetricsBasedScheduler();
+ }
+}
diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java
new file mode 100644
index 0000000000000..a0ec17f9825df
--- /dev/null
+++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.presto.spi.RouterPlugin;
+import com.facebook.presto.spi.router.SchedulerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RouterSchedulerPlugin
+ implements RouterPlugin
+{
+ @Override
+ public Iterable getSchedulerFactories()
+ {
+ List schedulerFactories = new ArrayList<>();
+ schedulerFactories.add(new MetricsBasedSchedulerFactory());
+ return schedulerFactories;
+ }
+}
diff --git a/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin b/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin
new file mode 100644
index 0000000000000..22e103f612a0c
--- /dev/null
+++ b/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin
@@ -0,0 +1 @@
+com.facebook.presto.router.scheduler.RouterSchedulerPlugin
\ No newline at end of file
diff --git a/presto-router-custom-scheduler/src/modernizer/violations.xml b/presto-router-custom-scheduler/src/modernizer/violations.xml
new file mode 100644
index 0000000000000..bca7ebcf3900a
--- /dev/null
+++ b/presto-router-custom-scheduler/src/modernizer/violations.xml
@@ -0,0 +1,45 @@
+
+
+
+ java/lang/Class.newInstance:()Ljava/lang/Object;
+ 1.1
+ Prefer Class.getConstructor().newInstance()
+
+
+
+ java/lang/String.toLowerCase:()Ljava/lang/String;
+ 1.1
+ Prefer String.toLowerCase(java.util.Locale)
+
+
+
+ com/google/common/primitives/Ints.checkedCast:(J)I
+ 1.8
+ Prefer Math.toIntExact(long)
+
+
+
+ org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;)V
+ 1.8
+ Use com.facebook.presto.testing.assertions.Assert.assertEquals due to TestNG #543
+
+
+
+ org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/String;)V
+ 1.8
+ Use com.facebook.presto.testing.assertions.Assert.assertEquals due to TestNG #543
+
+
+
+ java/util/TimeZone.getTimeZone:(Ljava/lang/String;)Ljava/util/TimeZone;
+ 1.8
+ Avoid TimeZone.getTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(..)) instead, or TimeZone.getTimeZone(..., false).
+
+
+
+ org/joda/time/DateTimeZone.toTimeZone:()Ljava/util/TimeZone;
+ 1.8
+ Avoid DateTimeZone.toTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(dtz.getId())) instead.
+
+
+
diff --git a/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java b/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java
new file mode 100644
index 0000000000000..1cf73ec65da6d
--- /dev/null
+++ b/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.presto.spi.router.ClusterInfo;
+import com.facebook.presto.spi.router.Scheduler;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestCustomScheduler
+{
+ Map clusterInfos = new HashMap<>();
+
+ static class MockRemoteClusterInfo
+ implements ClusterInfo
+ {
+ long runningQueries;
+ long queuedQueries;
+
+ MockRemoteClusterInfo(long runningQueries, long queuedQueries)
+ {
+ this.runningQueries = runningQueries;
+ this.queuedQueries = queuedQueries;
+ }
+
+ @Override
+ public long getRunningQueries()
+ {
+ return runningQueries;
+ }
+
+ @Override
+ public long getQueuedQueries()
+ {
+ return queuedQueries;
+ }
+ }
+
+ @Test
+ public void testMetricsBasedScheduler()
+ throws Exception
+ {
+ Scheduler scheduler = new MetricsBasedScheduler();
+
+ URI uri1 = new URI("192.168.0.1");
+ URI uri2 = new URI("192.168.0.2");
+ URI uri3 = new URI("192.168.0.3");
+
+ clusterInfos.put(uri1, new MockRemoteClusterInfo(10, 10));
+ clusterInfos.put(uri2, new MockRemoteClusterInfo(20, 20));
+ clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30));
+ scheduler.setClusterInfos(clusterInfos);
+ URI target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
+ assertEquals(target.getPath(), "192.168.0.1");
+
+ clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20));
+ clusterInfos.put(uri2, new MockRemoteClusterInfo(10, 10));
+ clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30));
+ scheduler.setClusterInfos(clusterInfos);
+ target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
+ assertEquals(target.getPath(), "192.168.0.2");
+
+ clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20));
+ clusterInfos.put(uri2, new MockRemoteClusterInfo(30, 30));
+ clusterInfos.put(uri3, new MockRemoteClusterInfo(10, 10));
+ scheduler.setClusterInfos(clusterInfos);
+ target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
+ assertEquals(target.getPath(), "192.168.0.3");
+ }
+}
diff --git a/presto-router/etc/router-config/router-scheduler.properties b/presto-router/etc/router-config/router-scheduler.properties
new file mode 100644
index 0000000000000..dceb8d2251157
--- /dev/null
+++ b/presto-router/etc/router-config/router-scheduler.properties
@@ -0,0 +1 @@
+router-scheduler.name=metricsBased
\ No newline at end of file
diff --git a/presto-router/pom.xml b/presto-router/pom.xml
index f70cdedd79e78..4cecb9e7c7dad 100755
--- a/presto-router/pom.xml
+++ b/presto-router/pom.xml
@@ -162,16 +162,50 @@
jmxutils
-
- org.testng
- testng
- test
+ io.airlift.resolver
+ resolver
+
+
+ com.google.inject
+ guice
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ jcl-over-slf4j
+
+
+
+
+
+ com.facebook.drift
+ drift-api
com.facebook.presto
presto-main
+
+
+ software.amazon.ion
+ ion-java
+
+
+
+
+
+ org.sonatype.aether
+ aether-api
+
+
+
+
+ org.testng
+ testng
test
diff --git a/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java b/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java
index 19c30ec1869a4..62d25ece5607d 100755
--- a/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java
@@ -24,7 +24,9 @@
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeModule;
import com.facebook.airlift.tracetoken.TraceTokenModule;
+import com.facebook.presto.router.scheduler.SchedulerManager;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Injector;
import com.google.inject.Module;
import org.weakref.jmx.guice.MBeanModule;
@@ -53,7 +55,9 @@ public static void start(Module... extraModules)
Logger log = Logger.get(RouterModule.class);
try {
- app.initialize();
+ Injector injector = app.initialize();
+ injector.getInstance(RouterPluginManager.class).loadPlugins();
+ injector.getInstance(SchedulerManager.class).loadScheduler();
log.info("======== SERVER STARTED ========");
}
catch (Throwable t) {
diff --git a/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java b/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java
index ffc9bb35f60ea..7f09879febfc0 100755
--- a/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java
@@ -14,6 +14,7 @@
package com.facebook.presto.router;
import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
+import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.router.cluster.ClusterManager;
import com.facebook.presto.router.cluster.ClusterStatusResource;
import com.facebook.presto.router.cluster.ClusterStatusTracker;
@@ -24,6 +25,9 @@
import com.facebook.presto.router.predictor.ForQueryMemoryPredictor;
import com.facebook.presto.router.predictor.PredictorManager;
import com.facebook.presto.router.predictor.RemoteQueryFactory;
+import com.facebook.presto.router.scheduler.SchedulerManager;
+import com.facebook.presto.server.PluginManagerConfig;
+import com.facebook.presto.server.ServerConfig;
import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.units.Duration;
@@ -52,20 +56,30 @@ public class RouterModule
@Override
protected void setup(Binder binder)
{
+ ServerConfig serverConfig = buildConfigObject(ServerConfig.class);
+
httpServerBinder(binder).bindResource(UI_PATH, ROUTER_UI).withWelcomeFile(INDEX_HTML);
configBinder(binder).bindConfig(RouterConfig.class);
+ binder.bind(SchedulerManager.class).in(Scopes.SINGLETON);
binder.bind(ClusterManager.class).in(Scopes.SINGLETON);
binder.bind(RemoteInfoFactory.class).in(Scopes.SINGLETON);
bindHttpClient(binder, QUERY_TRACKER, ForQueryInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);
bindHttpClient(binder, QUERY_TRACKER, ForClusterInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND);
+ // Determine the NodeVersion
+ NodeVersion nodeVersion = new NodeVersion(serverConfig.getPrestoVersion());
+ binder.bind(NodeVersion.class).toInstance(nodeVersion);
+
binder.bind(ClusterStatusTracker.class).in(Scopes.SINGLETON);
binder.bind(PredictorManager.class).in(Scopes.SINGLETON);
binder.bind(RemoteQueryFactory.class).in(Scopes.SINGLETON);
+ binder.bind(RouterPluginManager.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(PluginManagerConfig.class);
+
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryCpuPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
bindHttpClient(binder, QUERY_PREDICTOR, ForQueryMemoryPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND);
diff --git a/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java b/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java
new file mode 100644
index 0000000000000..c70c37e149bee
--- /dev/null
+++ b/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java
@@ -0,0 +1,258 @@
+package com.facebook.presto.router;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.router.scheduler.SchedulerManager;
+import com.facebook.presto.server.PluginClassLoader;
+import com.facebook.presto.server.PluginManagerConfig;
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.RouterPlugin;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.router.SchedulerFactory;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Ordering;
+import io.airlift.resolver.ArtifactResolver;
+import io.airlift.resolver.DefaultArtifact;
+import org.sonatype.aether.artifact.Artifact;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.facebook.presto.server.PluginDiscovery.discoverPlugins;
+import static com.facebook.presto.server.PluginDiscovery.writePluginServices;
+import static java.util.Objects.requireNonNull;
+
+@ThreadSafe
+public class RouterPluginManager
+{
+ // When generating code the AfterBurner module loads classes with *some* classloader.
+ // When the AfterBurner module is configured not to use the value classloader
+ // (e.g., AfterBurner().setUseValueClassLoader(false)) AppClassLoader is used for loading those
+ // classes. Otherwise, the PluginClassLoader is used, which is the default behavior.
+ // Therefore, in the former case Afterburner won't be able to load the connector classes
+ // as AppClassLoader doesn't see them, and in the latter case the PluginClassLoader won't be
+ // able to load the AfterBurner classes themselves. So, our solution is to use the PluginClassLoader
+ // and whitelist the AfterBurner classes here, so that the PluginClassLoader can load the
+ // AfterBurner classes.
+ private static final ImmutableList SPI_PACKAGES = ImmutableList.builder()
+ .add("com.facebook.presto.spi.")
+ .add("com.fasterxml.jackson.annotation.")
+ .add("com.fasterxml.jackson.module.afterburner.")
+ .add("io.airlift.slice.")
+ .add("io.airlift.units.")
+ .add("org.openjdk.jol.")
+ .add("com.facebook.presto.common")
+ .add("com.facebook.drift.annotations.")
+ .add("com.facebook.drift.TException")
+ .add("com.facebook.drift.TApplicationException")
+ .build();
+
+ private static final Logger log = Logger.get(RouterPluginManager.class);
+
+ private final SchedulerManager schedulerManager;
+ private final File installedPluginsDir;
+ private final List plugins;
+ private final ArtifactResolver resolver;
+ private final AtomicBoolean pluginsLoading = new AtomicBoolean();
+ private final AtomicBoolean pluginsLoaded = new AtomicBoolean();
+ private static final String PLUGIN_SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();
+ private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();
+
+ @Inject
+ public RouterPluginManager(
+ PluginManagerConfig config,
+ SchedulerManager schedulerManager)
+ {
+ requireNonNull(config, "config is null");
+
+ installedPluginsDir = config.getInstalledPluginsDir();
+ if (config.getPlugins() == null) {
+ this.plugins = ImmutableList.of();
+ }
+ else {
+ this.plugins = ImmutableList.copyOf(config.getPlugins());
+ }
+ this.resolver = new ArtifactResolver(config.getMavenLocalRepository(), config.getMavenRemoteRepository());
+ this.schedulerManager = requireNonNull(schedulerManager, "schedulerManager is null");
+ }
+
+ public void loadPlugins()
+ throws Exception
+ {
+ if (!pluginsLoading.compareAndSet(false, true)) {
+ return;
+ }
+
+ for (File file : listFiles(installedPluginsDir)) {
+ if (file.isDirectory()) {
+ loadPlugin(file.getAbsolutePath());
+ }
+ }
+
+ for (String plugin : plugins) {
+ loadPlugin(plugin);
+ }
+
+ pluginsLoaded.set(true);
+ }
+
+ private void loadPlugin(String plugin)
+ throws Exception
+ {
+ log.info("-- Loading plugin %s --", plugin);
+ PluginClassLoader pluginClassLoader = buildClassLoader(plugin);
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
+ loadPlugin(pluginClassLoader, RouterPlugin.class);
+ loadPlugin(pluginClassLoader, Plugin.class);
+ }
+ log.info("-- Finished loading plugin %s --", plugin);
+ }
+
+ private void loadPlugin(PluginClassLoader pluginClassLoader, Class> clazz)
+ {
+ ServiceLoader> serviceLoader = ServiceLoader.load(clazz, pluginClassLoader);
+ List> plugins = ImmutableList.copyOf(serviceLoader);
+
+ if (plugins.isEmpty()) {
+ log.warn("No service providers of type %s", clazz.getName());
+ }
+
+ for (Object plugin : plugins) {
+ log.info("Installing %s", plugin.getClass().getName());
+ if (plugin instanceof Plugin) {
+ installPlugin((Plugin) plugin);
+ }
+ if (plugin instanceof RouterPlugin) {
+ installRouterPlugin((RouterPlugin) plugin);
+ }
+ else {
+ log.warn("Unknown plugin type: %s", plugin.getClass().getName());
+ }
+ }
+ }
+
+ public void installPlugin(Plugin plugin)
+ {
+ return;
+ }
+
+ public void installRouterPlugin(RouterPlugin plugin)
+ {
+ for (SchedulerFactory schedulerFactory : plugin.getSchedulerFactories()) {
+ log.info("Registering router scheduler %s", schedulerFactory.getName());
+ schedulerManager.addSchedulerFactory(schedulerFactory);
+ }
+ }
+
+ private PluginClassLoader buildClassLoader(String plugin)
+ throws Exception
+ {
+ File file = new File(plugin);
+ if (file.isFile() && (file.getName().equals("pom.xml") || file.getName().endsWith(".pom"))) {
+ return buildClassLoaderFromPom(file);
+ }
+ if (file.isDirectory()) {
+ return buildClassLoaderFromDirectory(file);
+ }
+ return buildClassLoaderFromCoordinates(plugin);
+ }
+
+ private PluginClassLoader buildClassLoaderFromPom(File pomFile)
+ throws Exception
+ {
+ List artifacts = resolver.resolvePom(pomFile);
+ PluginClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());
+
+ Artifact artifact = artifacts.get(0);
+ Set plugins = discoverPlugins(artifact, classLoader, SERVICES_FILE, Plugin.class.getName());
+ if (!plugins.isEmpty()) {
+ writePluginServices(plugins, artifact.getFile(), SERVICES_FILE);
+ }
+
+ return classLoader;
+ }
+
+ private PluginClassLoader buildClassLoaderFromDirectory(File dir)
+ throws Exception
+ {
+ log.debug("Classpath for %s:", dir.getName());
+ List urls = new ArrayList<>();
+ for (File file : listFiles(dir)) {
+ log.debug(" %s", file);
+ urls.add(file.toURI().toURL());
+ }
+ return createClassLoader(urls);
+ }
+
+ private PluginClassLoader buildClassLoaderFromCoordinates(String coordinates)
+ throws Exception
+ {
+ Artifact rootArtifact = new DefaultArtifact(coordinates);
+ List artifacts = resolver.resolveArtifacts(rootArtifact);
+ return createClassLoader(artifacts, rootArtifact.toString());
+ }
+
+ private PluginClassLoader createClassLoader(List artifacts, String name)
+ throws IOException
+ {
+ log.debug("Classpath for %s:", name);
+ List urls = new ArrayList<>();
+ for (Artifact artifact : sortedArtifacts(artifacts)) {
+ if (artifact.getFile() == null) {
+ throw new RuntimeException("Could not resolve artifact: " + artifact);
+ }
+ File file = artifact.getFile().getCanonicalFile();
+ log.debug(" %s", file);
+ urls.add(file.toURI().toURL());
+ }
+ return createClassLoader(urls);
+ }
+
+ private PluginClassLoader createClassLoader(List urls)
+ {
+ ClassLoader parent = getClass().getClassLoader();
+ return new PluginClassLoader(urls, parent, SPI_PACKAGES);
+ }
+
+ private static List listFiles(File installedPluginsDir)
+ {
+ if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
+ File[] files = installedPluginsDir.listFiles();
+ if (files != null) {
+ Arrays.sort(files);
+ return ImmutableList.copyOf(files);
+ }
+ }
+ return ImmutableList.of();
+ }
+
+ private static List sortedArtifacts(List artifacts)
+ {
+ List list = new ArrayList<>(artifacts);
+ Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile));
+ return list;
+ }
+}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java
index 42fcd7d03b68b..0fa8b8e75dfe0 100755
--- a/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java
@@ -13,14 +13,17 @@
*/
package com.facebook.presto.router.cluster;
+import com.facebook.airlift.log.Logger;
import com.facebook.presto.router.RouterConfig;
import com.facebook.presto.router.scheduler.Scheduler;
import com.facebook.presto.router.scheduler.SchedulerFactory;
+import com.facebook.presto.router.scheduler.SchedulerManager;
import com.facebook.presto.router.scheduler.SchedulerType;
import com.facebook.presto.router.spec.GroupSpec;
import com.facebook.presto.router.spec.RouterSpec;
import com.facebook.presto.router.spec.SelectorRuleSpec;
import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.router.ClusterInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -31,33 +34,61 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import static com.facebook.presto.router.RouterUtil.parseRouterConfig;
+import static com.facebook.presto.router.scheduler.SchedulerType.CUSTOM_PLUGIN_SCHEDULER;
import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_RANDOM_CHOICE;
import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;
public class ClusterManager
{
- private final Map groups;
- private final List groupSelectors;
- private final SchedulerType schedulerType;
- private final Scheduler scheduler;
- private final HashMap> serverWeights = new HashMap<>();
+ private final RouterConfig routerConfig;
+ private Map groups;
+ private List groupSelectors;
+ private SchedulerType schedulerType;
+ private Scheduler scheduler;
+ private HashMap> serverWeights = new HashMap<>();
+ private final AtomicLong lastConfigUpdate = new AtomicLong();
+ private final RemoteInfoFactory remoteInfoFactory;
+ private final SchedulerManager schedulerManager;
+ private final Logger log = Logger.get(ClusterManager.class);
+
+ // Cluster status
+ private final ConcurrentHashMap remoteClusterInfos = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap remoteQueryInfos = new ConcurrentHashMap<>();
@Inject
- public ClusterManager(RouterConfig config)
+ public ClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory,
+ SchedulerManager schedulerManager)
{
- RouterSpec routerSpec = parseRouterConfig(config)
- .orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
+ this.routerConfig = config;
+ this.schedulerManager = schedulerManager;
+ this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null");
+ initializeRouterConfigSpec(this.routerConfig);
+ List allClusters = getAllClusters();
+ allClusters.forEach(uri -> {
+ log.info("Attaching cluster %s to the router", uri.getHost());
+ remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(uri));
+ remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(uri));
+ log.info("Successfully attached cluster %s to the router.", uri.getHost());
+ });
+ }
+ private void initializeRouterConfigSpec(RouterConfig routerConfig)
+ {
+ RouterSpec routerSpec = parseRouterConfig(routerConfig)
+ .orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config"));
this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group)));
this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors());
this.schedulerType = routerSpec.getSchedulerType();
- this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create();
-
+ this.scheduler = new SchedulerFactory(schedulerType, schedulerManager).create();
this.initializeServerWeights();
}
@@ -77,11 +108,25 @@ public Optional getDestination(RequestInfo requestInfo)
checkArgument(groups.containsKey(target.get()));
GroupSpec groupSpec = groups.get(target.get());
+
scheduler.setCandidates(groupSpec.getMembers());
if (schedulerType == WEIGHTED_RANDOM_CHOICE) {
scheduler.setWeights(serverWeights.get(groupSpec.getName()));
}
+ else if (schedulerType == CUSTOM_PLUGIN_SCHEDULER) {
+ try {
+ //Set remote cluster infos in the custom plugin scheduler
+ Map healthyRemoteClusterInfos = remoteClusterInfos.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ scheduler.setClusterInfos(healthyRemoteClusterInfos);
+ return scheduler.getDestination(requestInfo.getUser(), requestInfo.getQuery());
+ }
+ catch (Exception e) {
+ log.error("Custom Plugin Scheduler failed to schedule the query!");
+ return Optional.empty();
+ }
+ }
return scheduler.getDestination(requestInfo.getUser());
}
@@ -105,4 +150,14 @@ private void initializeServerWeights()
}
});
}
+
+ public ConcurrentHashMap getRemoteClusterInfos()
+ {
+ return remoteClusterInfos;
+ }
+
+ public ConcurrentHashMap getRemoteQueryInfos()
+ {
+ return remoteQueryInfos;
+ }
}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java
index ca1dc2a1fb440..956a63926f5af 100755
--- a/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java
@@ -15,6 +15,7 @@
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.router.ClusterInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -23,7 +24,7 @@
import java.util.concurrent.atomic.AtomicLong;
public class RemoteClusterInfo
- extends RemoteState
+ extends RemoteState implements ClusterInfo
{
private static final ObjectMapper mapper = new ObjectMapper();
private static final Logger log = Logger.get(RemoteClusterInfo.class);
@@ -55,26 +56,31 @@ public void handleResponse(JsonNode response)
runningDrivers.set(fields.get(RUNNING_DRIVERS));
}
+ @Override
public long getRunningQueries()
{
return runningQueries.get();
}
+ @Override
public long getBlockedQueries()
{
return blockedQueries.get();
}
+ @Override
public long getQueuedQueries()
{
return queuedQueries.get();
}
+ @Override
public long getActiveWorkers()
{
return activeWorkers.get();
}
+ @Override
public long getRunningDrivers()
{
return runningDrivers.get();
diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java
index f9f4cdcb4142f..775e77a89c085 100755
--- a/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java
@@ -35,12 +35,14 @@ public class RequestInfo
private final String user;
private final Optional source;
private final List clientTags;
+ private final String query;
public RequestInfo(HttpServletRequest servletRequest, String query)
{
this.user = parseHeader(servletRequest, PRESTO_USER);
this.source = Optional.ofNullable(parseHeader(servletRequest, PRESTO_SOURCE));
this.clientTags = requireNonNull(parseClientTags(servletRequest), "clientTags is null");
+ this.query = query;
}
public String getUser()
@@ -53,6 +55,11 @@ public Optional getSource()
return source;
}
+ public String getQuery()
+ {
+ return query;
+ }
+
public List getClientTags()
{
return clientTags;
diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java
new file mode 100644
index 0000000000000..114379eddcbac
--- /dev/null
+++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.router.ClusterInfo;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class CustomPluginScheduler
+ implements Scheduler
+{
+ private SchedulerManager schedulerManager;
+
+ private static final Logger log = Logger.get(CustomPluginScheduler.class);
+
+ public CustomPluginScheduler(SchedulerManager schedulerManager)
+ {
+ this.schedulerManager = schedulerManager;
+ schedulerManager.setRequired();
+ }
+
+ @Override
+ public Optional getDestination(String user, String query)
+ {
+ try {
+ return schedulerManager.getScheduler().getDestination(user, query);
+ }
+ catch (Exception e) {
+ log.warn(e, "Error getting destination");
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void setCandidates(List candidates)
+ {
+ schedulerManager.getScheduler().setCandidates(candidates);
+ }
+
+ @Override
+ public void setClusterInfos(Map clusterInfo)
+ {
+ schedulerManager.getScheduler().setClusterInfos(clusterInfo);
+ }
+}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java
index 2d4a2f1b53745..031c6cb23a97a 100644
--- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java
@@ -14,10 +14,12 @@
package com.facebook.presto.router.scheduler;
import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.router.ClusterInfo;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -28,8 +30,18 @@ public interface Scheduler
* Schedules a request from a user to a concrete candidate. Returns the
* URI of this candidate.
*/
- Optional getDestination(String user);
-
+ default Optional getDestination(String user)
+ {
+ return Optional.empty();
+ };
+ /**
+ * Schedules a request from a user with specific query string to a concrete candidate. Returns the
+ * URI of this candidate.
+ */
+ default Optional getDestination(String user, String query)
+ {
+ return Optional.empty();
+ };
/**
* Sets the candidates with the list of URIs for scheduling.
*/
@@ -42,4 +54,9 @@ default void setWeights(HashMap weights)
{
throw new PrestoException(NOT_SUPPORTED, "This scheduler does not support setting weights");
}
+
+ /**
+ * Sets the scheduler with cluster info for clusters.
+ */
+ default void setClusterInfos(Map clusterInfos) {}
}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java
index cb7d081f0caf6..d194e3dbbf47c 100644
--- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java
@@ -21,10 +21,12 @@
public class SchedulerFactory
{
private final SchedulerType schedulerType;
+ private final SchedulerManager schedulerManager;
- public SchedulerFactory(SchedulerType schedulerType)
+ public SchedulerFactory(SchedulerType schedulerType, SchedulerManager schedulerManager)
{
this.schedulerType = requireNonNull(schedulerType, "schedulerType is null");
+ this.schedulerManager = requireNonNull(schedulerManager, "schedulerManager is null");
}
public Scheduler create()
@@ -38,6 +40,8 @@ public Scheduler create()
return new UserHashScheduler();
case ROUND_ROBIN:
return new RoundRobinScheduler();
+ case CUSTOM_PLUGIN_SCHEDULER:
+ return new CustomPluginScheduler(schedulerManager);
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported router scheduler type " + schedulerType);
}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java
new file mode 100644
index 0000000000000..a734920767e6e
--- /dev/null
+++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.router.scheduler;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.router.Scheduler;
+import com.facebook.presto.spi.router.SchedulerFactory;
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.facebook.presto.util.PropertiesUtil.loadProperties;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class SchedulerManager
+{
+ private static final Logger log = Logger.get(SchedulerManager.class);
+
+ private static final File CONFIG_FILE = new File("etc/router-config/router-scheduler.properties");
+ private static final String NAME_PROPERTY = "router-scheduler.name";
+
+ private final AtomicBoolean required = new AtomicBoolean();
+ private final Map factories = new ConcurrentHashMap<>();
+ private final AtomicReference scheduler = new AtomicReference<>();
+ private final AtomicReference schedulerName = new AtomicReference<>("");
+
+ public void setRequired()
+ {
+ required.set(true);
+ }
+
+ public void addSchedulerFactory(SchedulerFactory factory)
+ {
+ checkArgument(factories.putIfAbsent(factory.getName(), factory) == null,
+ "Presto Router Scheduler '%s' is already registered", factory.getName());
+ }
+
+ public void loadScheduler()
+ throws Exception
+ {
+ if (!required.get()) {
+ return;
+ }
+
+ File configFileLocation = CONFIG_FILE.getAbsoluteFile();
+ Map properties = new HashMap<>(loadProperties(configFileLocation));
+
+ String name = properties.remove(NAME_PROPERTY);
+ checkArgument(!isNullOrEmpty(name),
+ "Router scheduler configuration %s does not contain %s", configFileLocation, NAME_PROPERTY);
+
+ if (!schedulerName.get().equalsIgnoreCase(name)) {
+ log.info("-- Loading Presto Router Scheduler --");
+ SchedulerFactory factory = factories.get(name);
+ checkState(factory != null, "Presto Router Scheduler %s is not registered", name);
+ Scheduler scheduler = factory.create(ImmutableMap.copyOf(properties));
+ this.scheduler.set(requireNonNull(scheduler, "scheduler is null"));
+ this.schedulerName.set(name);
+ log.info("-- Loaded Presto Router Scheduler %s --", name);
+ }
+ }
+
+ public Scheduler getScheduler()
+ {
+ checkState(scheduler.get() != null, "scheduler was not loaded");
+ return scheduler.get();
+ }
+}
diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java
index cef91386ebb66..431ba865b7d32 100644
--- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java
+++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java
@@ -19,4 +19,5 @@ public enum SchedulerType
ROUND_ROBIN,
USER_HASH,
WEIGHTED_RANDOM_CHOICE,
+ CUSTOM_PLUGIN_SCHEDULER
}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java
new file mode 100644
index 0000000000000..9c2d5abfc70c8
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi;
+
+import com.facebook.presto.spi.router.SchedulerFactory;
+
+import static java.util.Collections.emptyList;
+
+public interface RouterPlugin
+{
+ default Iterable getSchedulerFactories()
+ {
+ return emptyList();
+ }
+}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java
new file mode 100644
index 0000000000000..8b79fbb57478a
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi.router;
+
+public interface ClusterInfo
+{
+ default long getRunningQueries()
+ {
+ return 0;
+ }
+
+ default long getBlockedQueries()
+ {
+ return 0;
+ }
+
+ default long getQueuedQueries()
+ {
+ return 0;
+ }
+
+ default long getActiveWorkers()
+ {
+ return 0;
+ }
+
+ default long getRunningDrivers()
+ {
+ return 0;
+ }
+}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java
new file mode 100644
index 0000000000000..489e624239d11
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi.router;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public interface Scheduler
+{
+ /**
+ * Schedules a query from a user to a concrete candidate. Returns the
+ * URI of this candidate.
+ */
+ Optional getDestination(String user, String query);
+
+ /**
+ * Sets the candidates with the list of URIs for scheduling.
+ */
+ void setCandidates(List candidates);
+
+ /**
+ * Sets remote cluster infos for each cluster.
+ */
+ default void setClusterInfos(Map clusterInfos) {}
+}
diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java
new file mode 100644
index 0000000000000..16c48dbb5fe6c
--- /dev/null
+++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.spi.router;
+
+import java.util.Map;
+
+public interface SchedulerFactory
+{
+ String getName();
+
+ Scheduler create(Map config);
+}