From 745c65b7ec96f7bf6413569a6222fadfdc9e9e6e Mon Sep 17 00:00:00 2001 From: jp-sivaprasad Date: Mon, 27 Jan 2025 04:36:12 -0800 Subject: [PATCH] Metrics based custom scheduler plugin --- .../presto/server/PluginClassLoader.java | 2 +- .../presto/server/PluginDiscovery.java | 2 +- presto-router-custom-scheduler/README.txt | 17 ++ presto-router-custom-scheduler/pom.xml | 86 ++++++ .../scheduler/CustomSchedulerConstants.java | 23 ++ .../scheduler/MetricsBasedScheduler.java | 86 ++++++ .../MetricsBasedSchedulerFactory.java | 35 +++ .../scheduler/RouterSchedulerPlugin.java | 32 +++ .../com.facebook.presto.spi.RouterPlugin | 1 + .../src/modernizer/violations.xml | 45 +++ .../presto/scheduler/TestCustomScheduler.java | 87 ++++++ .../router-config/router-scheduler.properties | 1 + presto-router/pom.xml | 42 ++- .../facebook/presto/router/PrestoRouter.java | 6 +- .../facebook/presto/router/RouterModule.java | 14 + .../presto/router/RouterPluginManager.java | 258 ++++++++++++++++++ .../presto/router/cluster/ClusterManager.java | 75 ++++- .../router/cluster/RemoteClusterInfo.java | 8 +- .../presto/router/cluster/RequestInfo.java | 7 + .../scheduler/CustomPluginScheduler.java | 60 ++++ .../presto/router/scheduler/Scheduler.java | 21 +- .../router/scheduler/SchedulerFactory.java | 6 +- .../router/scheduler/SchedulerManager.java | 87 ++++++ .../router/scheduler/SchedulerType.java | 1 + .../com/facebook/presto/spi/RouterPlugin.java | 26 ++ .../presto/spi/router/ClusterInfo.java | 42 +++ .../facebook/presto/spi/router/Scheduler.java | 38 +++ .../presto/spi/router/SchedulerFactory.java | 23 ++ 28 files changed, 1110 insertions(+), 21 deletions(-) create mode 100755 presto-router-custom-scheduler/README.txt create mode 100644 presto-router-custom-scheduler/pom.xml create mode 100644 presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java create mode 100644 presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java create mode 100644 presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java create mode 100644 presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java create mode 100644 presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin create mode 100644 presto-router-custom-scheduler/src/modernizer/violations.xml create mode 100644 presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java create mode 100644 presto-router/etc/router-config/router-scheduler.properties create mode 100644 presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java create mode 100644 presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java create mode 100644 presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java b/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java index bce2dedf8b65a..7c3f11aac5b13 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java @@ -26,7 +26,7 @@ import static java.util.Objects.requireNonNull; -class PluginClassLoader +public class PluginClassLoader extends URLClassLoader { private static final ClassLoader PLATFORM_CLASS_LOADER = findPlatformClassLoader(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java index 6c0d1f4fe9ef1..347eb34703b85 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java @@ -38,7 +38,7 @@ import static java.nio.file.Files.walkFileTree; // This is a hack for development and does not support nested classes. -final class PluginDiscovery +public final class PluginDiscovery { private static final String CLASS_FILE_SUFFIX = ".class"; diff --git a/presto-router-custom-scheduler/README.txt b/presto-router-custom-scheduler/README.txt new file mode 100755 index 0000000000000..a536426d0a8bb --- /dev/null +++ b/presto-router-custom-scheduler/README.txt @@ -0,0 +1,17 @@ +# Presto Router Custom Scheduler Plugin +This package implements third party custom schedulers to be used by Presto router + +## Adding the custom scheduler plugin to presto router +The custom plugin jar that is built is placed in the presto-router/plugin/custom-scheduler directory, +to be picked up by the presto-router +The scheduler name has to be set to CUSTOM_PLUGIN_SCHEDULER in the router-config.json + "scheduler": "CUSTOM_PLUGIN_SCHEDULER" +The name of the custom scheduler factory needs to be set in the property file router-scheduler.properties in presto-router/etc/router-config + +## Main Classes: +RouterSchedulerPlugin - Custom Scheduler Plugin class to be loaded by the Router plugin manager. + This class implements the interface com.facebook.presto.spi.RouterPlugin. +MetricsBasedSchedulerFactory - Factory for creating specific custom scheduler + This class implements the interface com.facebook.presto.spi.SchedulerFactory +MetricsBasedScheduler - Custom scheduler implementing the scheduling logic for clusters. More similar classes can be added implementing specific custom scheduling logic. + This class implements the interface com.facebook.presto.spi.router.Scheduler \ No newline at end of file diff --git a/presto-router-custom-scheduler/pom.xml b/presto-router-custom-scheduler/pom.xml new file mode 100644 index 0000000000000..df5884ad3411c --- /dev/null +++ b/presto-router-custom-scheduler/pom.xml @@ -0,0 +1,86 @@ + + + 4.0.0 + + + com.facebook.presto + presto-root + 0.291-SNAPSHOT + + + com.facebook.presto.router + scheduler + 0.291-SNAPSHOT + jar + scheduler + SPI plugin project for presto router custom scheduler + + + + com.facebook.presto + presto-spi + + + + com.facebook.airlift + log + 0.215 + + + + + org.testng + testng + 7.10.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.9.1 + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.3.0 + + + make-assembly + package + + single + + + + jar-with-dependencies + + + + true + com.facebook.presto.router.scheduler.RouterSchedulerPlugin + + + + + + + + + + \ No newline at end of file diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java new file mode 100644 index 0000000000000..fe309dce13eec --- /dev/null +++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/CustomSchedulerConstants.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +public class CustomSchedulerConstants +{ + private CustomSchedulerConstants() + { + } + + public static final String METRICS_BASED = "metricsBased"; +} diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java new file mode 100644 index 0000000000000..1d0f7538c1ef8 --- /dev/null +++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedScheduler.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.router.ClusterInfo; +import com.facebook.presto.spi.router.Scheduler; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +//Metrics based scheduler uses coordinator and/or worker metrics for scheduling decisions +public class MetricsBasedScheduler + implements Scheduler +{ + private static final Logger log = Logger.get(MetricsBasedScheduler.class); + Map clusterInfos; + + //Min heap based priority queue. Cluster with lowest number of queries will be on top of the queue + PriorityQueue clusterQueue = new PriorityQueue((x, y) -> (int) (x.totalQueries - y.totalQueries)); + + private List candidates; + + private class ClusterQueryInfo + { + URI clusterUri; + long runningQueries; + long queuedQueries; + long totalQueries; + + ClusterQueryInfo(URI clusterUri, long runningQueries, long queuedQueries) + { + this.clusterUri = clusterUri; + this.runningQueries = runningQueries; + this.queuedQueries = queuedQueries; + totalQueries = (long) (runningQueries + queuedQueries); + log.info("Cluster URI : " + clusterUri + ", runningQueries : " + runningQueries + ", queuedQueries : " + queuedQueries + ", totalQueries : " + totalQueries); + } + } + + @Override + public Optional getDestination(String user, String query) + { + try { + if (clusterInfos != null && clusterInfos.size() > 0) { + clusterQueue.clear(); + clusterQueue.addAll(clusterInfos.keySet().stream() + .map(uri -> new ClusterQueryInfo(uri, clusterInfos.get(uri).getRunningQueries(), clusterInfos.get(uri).getQueuedQueries())) + .collect(Collectors.toList())); + return Optional.of(clusterQueue.poll().clusterUri); + } + return Optional.empty(); + } + catch (IllegalArgumentException e) { + log.warn(e, "Error getting destination for user " + user); + return Optional.empty(); + } + } + + @Override + public void setCandidates(List candidates) + { + this.candidates = candidates; + } + + @Override + public void setClusterInfos(Map clusterInfos) + { + this.clusterInfos = clusterInfos; + } +} diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java new file mode 100644 index 0000000000000..314fd827b8a80 --- /dev/null +++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/MetricsBasedSchedulerFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +import com.facebook.presto.spi.router.Scheduler; +import com.facebook.presto.spi.router.SchedulerFactory; + +import java.util.Map; + +public class MetricsBasedSchedulerFactory + implements SchedulerFactory +{ + @Override + public String getName() + { + return CustomSchedulerConstants.METRICS_BASED; + } + + @Override + public Scheduler create(Map config) + { + return new MetricsBasedScheduler(); + } +} diff --git a/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java new file mode 100644 index 0000000000000..a0ec17f9825df --- /dev/null +++ b/presto-router-custom-scheduler/src/main/java/com/facebook/presto/router/scheduler/RouterSchedulerPlugin.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +import com.facebook.presto.spi.RouterPlugin; +import com.facebook.presto.spi.router.SchedulerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class RouterSchedulerPlugin + implements RouterPlugin +{ + @Override + public Iterable getSchedulerFactories() + { + List schedulerFactories = new ArrayList<>(); + schedulerFactories.add(new MetricsBasedSchedulerFactory()); + return schedulerFactories; + } +} diff --git a/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin b/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin new file mode 100644 index 0000000000000..22e103f612a0c --- /dev/null +++ b/presto-router-custom-scheduler/src/main/resources/META-INF/services/com.facebook.presto.spi.RouterPlugin @@ -0,0 +1 @@ +com.facebook.presto.router.scheduler.RouterSchedulerPlugin \ No newline at end of file diff --git a/presto-router-custom-scheduler/src/modernizer/violations.xml b/presto-router-custom-scheduler/src/modernizer/violations.xml new file mode 100644 index 0000000000000..bca7ebcf3900a --- /dev/null +++ b/presto-router-custom-scheduler/src/modernizer/violations.xml @@ -0,0 +1,45 @@ + + + + java/lang/Class.newInstance:()Ljava/lang/Object; + 1.1 + Prefer Class.getConstructor().newInstance() + + + + java/lang/String.toLowerCase:()Ljava/lang/String; + 1.1 + Prefer String.toLowerCase(java.util.Locale) + + + + com/google/common/primitives/Ints.checkedCast:(J)I + 1.8 + Prefer Math.toIntExact(long) + + + + org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;)V + 1.8 + Use com.facebook.presto.testing.assertions.Assert.assertEquals due to TestNG #543 + + + + org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/String;)V + 1.8 + Use com.facebook.presto.testing.assertions.Assert.assertEquals due to TestNG #543 + + + + java/util/TimeZone.getTimeZone:(Ljava/lang/String;)Ljava/util/TimeZone; + 1.8 + Avoid TimeZone.getTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(..)) instead, or TimeZone.getTimeZone(..., false). + + + + org/joda/time/DateTimeZone.toTimeZone:()Ljava/util/TimeZone; + 1.8 + Avoid DateTimeZone.toTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(dtz.getId())) instead. + + + diff --git a/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java b/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java new file mode 100644 index 0000000000000..1cf73ec65da6d --- /dev/null +++ b/presto-router-custom-scheduler/src/test/java/com/ibm/presto/scheduler/TestCustomScheduler.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.router.scheduler; + +import com.facebook.presto.spi.router.ClusterInfo; +import com.facebook.presto.spi.router.Scheduler; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; + +public class TestCustomScheduler +{ + Map clusterInfos = new HashMap<>(); + + static class MockRemoteClusterInfo + implements ClusterInfo + { + long runningQueries; + long queuedQueries; + + MockRemoteClusterInfo(long runningQueries, long queuedQueries) + { + this.runningQueries = runningQueries; + this.queuedQueries = queuedQueries; + } + + @Override + public long getRunningQueries() + { + return runningQueries; + } + + @Override + public long getQueuedQueries() + { + return queuedQueries; + } + } + + @Test + public void testMetricsBasedScheduler() + throws Exception + { + Scheduler scheduler = new MetricsBasedScheduler(); + + URI uri1 = new URI("192.168.0.1"); + URI uri2 = new URI("192.168.0.2"); + URI uri3 = new URI("192.168.0.3"); + + clusterInfos.put(uri1, new MockRemoteClusterInfo(10, 10)); + clusterInfos.put(uri2, new MockRemoteClusterInfo(20, 20)); + clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30)); + scheduler.setClusterInfos(clusterInfos); + URI target = scheduler.getDestination("test", null).orElse(new URI("invalid")); + assertEquals(target.getPath(), "192.168.0.1"); + + clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20)); + clusterInfos.put(uri2, new MockRemoteClusterInfo(10, 10)); + clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30)); + scheduler.setClusterInfos(clusterInfos); + target = scheduler.getDestination("test", null).orElse(new URI("invalid")); + assertEquals(target.getPath(), "192.168.0.2"); + + clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20)); + clusterInfos.put(uri2, new MockRemoteClusterInfo(30, 30)); + clusterInfos.put(uri3, new MockRemoteClusterInfo(10, 10)); + scheduler.setClusterInfos(clusterInfos); + target = scheduler.getDestination("test", null).orElse(new URI("invalid")); + assertEquals(target.getPath(), "192.168.0.3"); + } +} diff --git a/presto-router/etc/router-config/router-scheduler.properties b/presto-router/etc/router-config/router-scheduler.properties new file mode 100644 index 0000000000000..dceb8d2251157 --- /dev/null +++ b/presto-router/etc/router-config/router-scheduler.properties @@ -0,0 +1 @@ +router-scheduler.name=metricsBased \ No newline at end of file diff --git a/presto-router/pom.xml b/presto-router/pom.xml index f70cdedd79e78..4cecb9e7c7dad 100755 --- a/presto-router/pom.xml +++ b/presto-router/pom.xml @@ -162,16 +162,50 @@ jmxutils - - org.testng - testng - test + io.airlift.resolver + resolver + + + com.google.inject + guice + + + org.slf4j + slf4j-api + + + org.slf4j + jcl-over-slf4j + + + + + + com.facebook.drift + drift-api com.facebook.presto presto-main + + + software.amazon.ion + ion-java + + + + + + org.sonatype.aether + aether-api + + + + + org.testng + testng test diff --git a/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java b/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java index 19c30ec1869a4..62d25ece5607d 100755 --- a/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java +++ b/presto-router/src/main/java/com/facebook/presto/router/PrestoRouter.java @@ -24,7 +24,9 @@ import com.facebook.airlift.log.Logger; import com.facebook.airlift.node.NodeModule; import com.facebook.airlift.tracetoken.TraceTokenModule; +import com.facebook.presto.router.scheduler.SchedulerManager; import com.google.common.collect.ImmutableList; +import com.google.inject.Injector; import com.google.inject.Module; import org.weakref.jmx.guice.MBeanModule; @@ -53,7 +55,9 @@ public static void start(Module... extraModules) Logger log = Logger.get(RouterModule.class); try { - app.initialize(); + Injector injector = app.initialize(); + injector.getInstance(RouterPluginManager.class).loadPlugins(); + injector.getInstance(SchedulerManager.class).loadScheduler(); log.info("======== SERVER STARTED ========"); } catch (Throwable t) { diff --git a/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java b/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java index ffc9bb35f60ea..7f09879febfc0 100755 --- a/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java +++ b/presto-router/src/main/java/com/facebook/presto/router/RouterModule.java @@ -14,6 +14,7 @@ package com.facebook.presto.router; import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.client.NodeVersion; import com.facebook.presto.router.cluster.ClusterManager; import com.facebook.presto.router.cluster.ClusterStatusResource; import com.facebook.presto.router.cluster.ClusterStatusTracker; @@ -24,6 +25,9 @@ import com.facebook.presto.router.predictor.ForQueryMemoryPredictor; import com.facebook.presto.router.predictor.PredictorManager; import com.facebook.presto.router.predictor.RemoteQueryFactory; +import com.facebook.presto.router.scheduler.SchedulerManager; +import com.facebook.presto.server.PluginManagerConfig; +import com.facebook.presto.server.ServerConfig; import com.google.inject.Binder; import com.google.inject.Scopes; import io.airlift.units.Duration; @@ -52,20 +56,30 @@ public class RouterModule @Override protected void setup(Binder binder) { + ServerConfig serverConfig = buildConfigObject(ServerConfig.class); + httpServerBinder(binder).bindResource(UI_PATH, ROUTER_UI).withWelcomeFile(INDEX_HTML); configBinder(binder).bindConfig(RouterConfig.class); + binder.bind(SchedulerManager.class).in(Scopes.SINGLETON); binder.bind(ClusterManager.class).in(Scopes.SINGLETON); binder.bind(RemoteInfoFactory.class).in(Scopes.SINGLETON); bindHttpClient(binder, QUERY_TRACKER, ForQueryInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND); bindHttpClient(binder, QUERY_TRACKER, ForClusterInfoTracker.class, IDLE_TIMEOUT_SECOND, REQUEST_TIMEOUT_SECOND); + // Determine the NodeVersion + NodeVersion nodeVersion = new NodeVersion(serverConfig.getPrestoVersion()); + binder.bind(NodeVersion.class).toInstance(nodeVersion); + binder.bind(ClusterStatusTracker.class).in(Scopes.SINGLETON); binder.bind(PredictorManager.class).in(Scopes.SINGLETON); binder.bind(RemoteQueryFactory.class).in(Scopes.SINGLETON); + binder.bind(RouterPluginManager.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(PluginManagerConfig.class); + bindHttpClient(binder, QUERY_PREDICTOR, ForQueryCpuPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND); bindHttpClient(binder, QUERY_PREDICTOR, ForQueryMemoryPredictor.class, IDLE_TIMEOUT_SECOND, PREDICTOR_REQUEST_TIMEOUT_SECOND); diff --git a/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java b/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java new file mode 100644 index 0000000000000..c70c37e149bee --- /dev/null +++ b/presto-router/src/main/java/com/facebook/presto/router/RouterPluginManager.java @@ -0,0 +1,258 @@ +package com.facebook.presto.router; +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.router.scheduler.SchedulerManager; +import com.facebook.presto.server.PluginClassLoader; +import com.facebook.presto.server.PluginManagerConfig; +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.RouterPlugin; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.router.SchedulerFactory; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Ordering; +import io.airlift.resolver.ArtifactResolver; +import io.airlift.resolver.DefaultArtifact; +import org.sonatype.aether.artifact.Artifact; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.facebook.presto.server.PluginDiscovery.discoverPlugins; +import static com.facebook.presto.server.PluginDiscovery.writePluginServices; +import static java.util.Objects.requireNonNull; + +@ThreadSafe +public class RouterPluginManager +{ + // When generating code the AfterBurner module loads classes with *some* classloader. + // When the AfterBurner module is configured not to use the value classloader + // (e.g., AfterBurner().setUseValueClassLoader(false)) AppClassLoader is used for loading those + // classes. Otherwise, the PluginClassLoader is used, which is the default behavior. + // Therefore, in the former case Afterburner won't be able to load the connector classes + // as AppClassLoader doesn't see them, and in the latter case the PluginClassLoader won't be + // able to load the AfterBurner classes themselves. So, our solution is to use the PluginClassLoader + // and whitelist the AfterBurner classes here, so that the PluginClassLoader can load the + // AfterBurner classes. + private static final ImmutableList SPI_PACKAGES = ImmutableList.builder() + .add("com.facebook.presto.spi.") + .add("com.fasterxml.jackson.annotation.") + .add("com.fasterxml.jackson.module.afterburner.") + .add("io.airlift.slice.") + .add("io.airlift.units.") + .add("org.openjdk.jol.") + .add("com.facebook.presto.common") + .add("com.facebook.drift.annotations.") + .add("com.facebook.drift.TException") + .add("com.facebook.drift.TApplicationException") + .build(); + + private static final Logger log = Logger.get(RouterPluginManager.class); + + private final SchedulerManager schedulerManager; + private final File installedPluginsDir; + private final List plugins; + private final ArtifactResolver resolver; + private final AtomicBoolean pluginsLoading = new AtomicBoolean(); + private final AtomicBoolean pluginsLoaded = new AtomicBoolean(); + private static final String PLUGIN_SERVICES_FILE = "META-INF/services/" + Plugin.class.getName(); + private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName(); + + @Inject + public RouterPluginManager( + PluginManagerConfig config, + SchedulerManager schedulerManager) + { + requireNonNull(config, "config is null"); + + installedPluginsDir = config.getInstalledPluginsDir(); + if (config.getPlugins() == null) { + this.plugins = ImmutableList.of(); + } + else { + this.plugins = ImmutableList.copyOf(config.getPlugins()); + } + this.resolver = new ArtifactResolver(config.getMavenLocalRepository(), config.getMavenRemoteRepository()); + this.schedulerManager = requireNonNull(schedulerManager, "schedulerManager is null"); + } + + public void loadPlugins() + throws Exception + { + if (!pluginsLoading.compareAndSet(false, true)) { + return; + } + + for (File file : listFiles(installedPluginsDir)) { + if (file.isDirectory()) { + loadPlugin(file.getAbsolutePath()); + } + } + + for (String plugin : plugins) { + loadPlugin(plugin); + } + + pluginsLoaded.set(true); + } + + private void loadPlugin(String plugin) + throws Exception + { + log.info("-- Loading plugin %s --", plugin); + PluginClassLoader pluginClassLoader = buildClassLoader(plugin); + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) { + loadPlugin(pluginClassLoader, RouterPlugin.class); + loadPlugin(pluginClassLoader, Plugin.class); + } + log.info("-- Finished loading plugin %s --", plugin); + } + + private void loadPlugin(PluginClassLoader pluginClassLoader, Class clazz) + { + ServiceLoader serviceLoader = ServiceLoader.load(clazz, pluginClassLoader); + List plugins = ImmutableList.copyOf(serviceLoader); + + if (plugins.isEmpty()) { + log.warn("No service providers of type %s", clazz.getName()); + } + + for (Object plugin : plugins) { + log.info("Installing %s", plugin.getClass().getName()); + if (plugin instanceof Plugin) { + installPlugin((Plugin) plugin); + } + if (plugin instanceof RouterPlugin) { + installRouterPlugin((RouterPlugin) plugin); + } + else { + log.warn("Unknown plugin type: %s", plugin.getClass().getName()); + } + } + } + + public void installPlugin(Plugin plugin) + { + return; + } + + public void installRouterPlugin(RouterPlugin plugin) + { + for (SchedulerFactory schedulerFactory : plugin.getSchedulerFactories()) { + log.info("Registering router scheduler %s", schedulerFactory.getName()); + schedulerManager.addSchedulerFactory(schedulerFactory); + } + } + + private PluginClassLoader buildClassLoader(String plugin) + throws Exception + { + File file = new File(plugin); + if (file.isFile() && (file.getName().equals("pom.xml") || file.getName().endsWith(".pom"))) { + return buildClassLoaderFromPom(file); + } + if (file.isDirectory()) { + return buildClassLoaderFromDirectory(file); + } + return buildClassLoaderFromCoordinates(plugin); + } + + private PluginClassLoader buildClassLoaderFromPom(File pomFile) + throws Exception + { + List artifacts = resolver.resolvePom(pomFile); + PluginClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath()); + + Artifact artifact = artifacts.get(0); + Set plugins = discoverPlugins(artifact, classLoader, SERVICES_FILE, Plugin.class.getName()); + if (!plugins.isEmpty()) { + writePluginServices(plugins, artifact.getFile(), SERVICES_FILE); + } + + return classLoader; + } + + private PluginClassLoader buildClassLoaderFromDirectory(File dir) + throws Exception + { + log.debug("Classpath for %s:", dir.getName()); + List urls = new ArrayList<>(); + for (File file : listFiles(dir)) { + log.debug(" %s", file); + urls.add(file.toURI().toURL()); + } + return createClassLoader(urls); + } + + private PluginClassLoader buildClassLoaderFromCoordinates(String coordinates) + throws Exception + { + Artifact rootArtifact = new DefaultArtifact(coordinates); + List artifacts = resolver.resolveArtifacts(rootArtifact); + return createClassLoader(artifacts, rootArtifact.toString()); + } + + private PluginClassLoader createClassLoader(List artifacts, String name) + throws IOException + { + log.debug("Classpath for %s:", name); + List urls = new ArrayList<>(); + for (Artifact artifact : sortedArtifacts(artifacts)) { + if (artifact.getFile() == null) { + throw new RuntimeException("Could not resolve artifact: " + artifact); + } + File file = artifact.getFile().getCanonicalFile(); + log.debug(" %s", file); + urls.add(file.toURI().toURL()); + } + return createClassLoader(urls); + } + + private PluginClassLoader createClassLoader(List urls) + { + ClassLoader parent = getClass().getClassLoader(); + return new PluginClassLoader(urls, parent, SPI_PACKAGES); + } + + private static List listFiles(File installedPluginsDir) + { + if (installedPluginsDir != null && installedPluginsDir.isDirectory()) { + File[] files = installedPluginsDir.listFiles(); + if (files != null) { + Arrays.sort(files); + return ImmutableList.copyOf(files); + } + } + return ImmutableList.of(); + } + + private static List sortedArtifacts(List artifacts) + { + List list = new ArrayList<>(artifacts); + Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile)); + return list; + } +} diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java index 42fcd7d03b68b..0fa8b8e75dfe0 100755 --- a/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java +++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/ClusterManager.java @@ -13,14 +13,17 @@ */ package com.facebook.presto.router.cluster; +import com.facebook.airlift.log.Logger; import com.facebook.presto.router.RouterConfig; import com.facebook.presto.router.scheduler.Scheduler; import com.facebook.presto.router.scheduler.SchedulerFactory; +import com.facebook.presto.router.scheduler.SchedulerManager; import com.facebook.presto.router.scheduler.SchedulerType; import com.facebook.presto.router.spec.GroupSpec; import com.facebook.presto.router.spec.RouterSpec; import com.facebook.presto.router.spec.SelectorRuleSpec; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.router.ClusterInfo; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -31,33 +34,61 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import static com.facebook.presto.router.RouterUtil.parseRouterConfig; +import static com.facebook.presto.router.scheduler.SchedulerType.CUSTOM_PLUGIN_SCHEDULER; import static com.facebook.presto.router.scheduler.SchedulerType.WEIGHTED_RANDOM_CHOICE; import static com.facebook.presto.spi.StandardErrorCode.CONFIGURATION_INVALID; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; public class ClusterManager { - private final Map groups; - private final List groupSelectors; - private final SchedulerType schedulerType; - private final Scheduler scheduler; - private final HashMap> serverWeights = new HashMap<>(); + private final RouterConfig routerConfig; + private Map groups; + private List groupSelectors; + private SchedulerType schedulerType; + private Scheduler scheduler; + private HashMap> serverWeights = new HashMap<>(); + private final AtomicLong lastConfigUpdate = new AtomicLong(); + private final RemoteInfoFactory remoteInfoFactory; + private final SchedulerManager schedulerManager; + private final Logger log = Logger.get(ClusterManager.class); + + // Cluster status + private final ConcurrentHashMap remoteClusterInfos = new ConcurrentHashMap<>(); + private final ConcurrentHashMap remoteQueryInfos = new ConcurrentHashMap<>(); @Inject - public ClusterManager(RouterConfig config) + public ClusterManager(RouterConfig config, RemoteInfoFactory remoteInfoFactory, + SchedulerManager schedulerManager) { - RouterSpec routerSpec = parseRouterConfig(config) - .orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config")); + this.routerConfig = config; + this.schedulerManager = schedulerManager; + this.remoteInfoFactory = requireNonNull(remoteInfoFactory, "remoteInfoFactory is null"); + initializeRouterConfigSpec(this.routerConfig); + List allClusters = getAllClusters(); + allClusters.forEach(uri -> { + log.info("Attaching cluster %s to the router", uri.getHost()); + remoteClusterInfos.put(uri, remoteInfoFactory.createRemoteClusterInfo(uri)); + remoteQueryInfos.put(uri, remoteInfoFactory.createRemoteQueryInfo(uri)); + log.info("Successfully attached cluster %s to the router.", uri.getHost()); + }); + } + private void initializeRouterConfigSpec(RouterConfig routerConfig) + { + RouterSpec routerSpec = parseRouterConfig(routerConfig) + .orElseThrow(() -> new PrestoException(CONFIGURATION_INVALID, "Failed to load router config")); this.groups = ImmutableMap.copyOf(routerSpec.getGroups().stream().collect(toMap(GroupSpec::getName, group -> group))); this.groupSelectors = ImmutableList.copyOf(routerSpec.getSelectors()); this.schedulerType = routerSpec.getSchedulerType(); - this.scheduler = new SchedulerFactory(routerSpec.getSchedulerType()).create(); - + this.scheduler = new SchedulerFactory(schedulerType, schedulerManager).create(); this.initializeServerWeights(); } @@ -77,11 +108,25 @@ public Optional getDestination(RequestInfo requestInfo) checkArgument(groups.containsKey(target.get())); GroupSpec groupSpec = groups.get(target.get()); + scheduler.setCandidates(groupSpec.getMembers()); if (schedulerType == WEIGHTED_RANDOM_CHOICE) { scheduler.setWeights(serverWeights.get(groupSpec.getName())); } + else if (schedulerType == CUSTOM_PLUGIN_SCHEDULER) { + try { + //Set remote cluster infos in the custom plugin scheduler + Map healthyRemoteClusterInfos = remoteClusterInfos.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + scheduler.setClusterInfos(healthyRemoteClusterInfos); + return scheduler.getDestination(requestInfo.getUser(), requestInfo.getQuery()); + } + catch (Exception e) { + log.error("Custom Plugin Scheduler failed to schedule the query!"); + return Optional.empty(); + } + } return scheduler.getDestination(requestInfo.getUser()); } @@ -105,4 +150,14 @@ private void initializeServerWeights() } }); } + + public ConcurrentHashMap getRemoteClusterInfos() + { + return remoteClusterInfos; + } + + public ConcurrentHashMap getRemoteQueryInfos() + { + return remoteQueryInfos; + } } diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java index ca1dc2a1fb440..956a63926f5af 100755 --- a/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java +++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/RemoteClusterInfo.java @@ -15,6 +15,7 @@ import com.facebook.airlift.http.client.HttpClient; import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.router.ClusterInfo; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -23,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; public class RemoteClusterInfo - extends RemoteState + extends RemoteState implements ClusterInfo { private static final ObjectMapper mapper = new ObjectMapper(); private static final Logger log = Logger.get(RemoteClusterInfo.class); @@ -55,26 +56,31 @@ public void handleResponse(JsonNode response) runningDrivers.set(fields.get(RUNNING_DRIVERS)); } + @Override public long getRunningQueries() { return runningQueries.get(); } + @Override public long getBlockedQueries() { return blockedQueries.get(); } + @Override public long getQueuedQueries() { return queuedQueries.get(); } + @Override public long getActiveWorkers() { return activeWorkers.get(); } + @Override public long getRunningDrivers() { return runningDrivers.get(); diff --git a/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java b/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java index f9f4cdcb4142f..775e77a89c085 100755 --- a/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java +++ b/presto-router/src/main/java/com/facebook/presto/router/cluster/RequestInfo.java @@ -35,12 +35,14 @@ public class RequestInfo private final String user; private final Optional source; private final List clientTags; + private final String query; public RequestInfo(HttpServletRequest servletRequest, String query) { this.user = parseHeader(servletRequest, PRESTO_USER); this.source = Optional.ofNullable(parseHeader(servletRequest, PRESTO_SOURCE)); this.clientTags = requireNonNull(parseClientTags(servletRequest), "clientTags is null"); + this.query = query; } public String getUser() @@ -53,6 +55,11 @@ public Optional getSource() return source; } + public String getQuery() + { + return query; + } + public List getClientTags() { return clientTags; diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java new file mode 100644 index 0000000000000..114379eddcbac --- /dev/null +++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/CustomPluginScheduler.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.router.ClusterInfo; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class CustomPluginScheduler + implements Scheduler +{ + private SchedulerManager schedulerManager; + + private static final Logger log = Logger.get(CustomPluginScheduler.class); + + public CustomPluginScheduler(SchedulerManager schedulerManager) + { + this.schedulerManager = schedulerManager; + schedulerManager.setRequired(); + } + + @Override + public Optional getDestination(String user, String query) + { + try { + return schedulerManager.getScheduler().getDestination(user, query); + } + catch (Exception e) { + log.warn(e, "Error getting destination"); + return Optional.empty(); + } + } + + @Override + public void setCandidates(List candidates) + { + schedulerManager.getScheduler().setCandidates(candidates); + } + + @Override + public void setClusterInfos(Map clusterInfo) + { + schedulerManager.getScheduler().setClusterInfos(clusterInfo); + } +} diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java index 2d4a2f1b53745..031c6cb23a97a 100644 --- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java +++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/Scheduler.java @@ -14,10 +14,12 @@ package com.facebook.presto.router.scheduler; import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.router.ClusterInfo; import java.net.URI; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; @@ -28,8 +30,18 @@ public interface Scheduler * Schedules a request from a user to a concrete candidate. Returns the * URI of this candidate. */ - Optional getDestination(String user); - + default Optional getDestination(String user) + { + return Optional.empty(); + }; + /** + * Schedules a request from a user with specific query string to a concrete candidate. Returns the + * URI of this candidate. + */ + default Optional getDestination(String user, String query) + { + return Optional.empty(); + }; /** * Sets the candidates with the list of URIs for scheduling. */ @@ -42,4 +54,9 @@ default void setWeights(HashMap weights) { throw new PrestoException(NOT_SUPPORTED, "This scheduler does not support setting weights"); } + + /** + * Sets the scheduler with cluster info for clusters. + */ + default void setClusterInfos(Map clusterInfos) {} } diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java index cb7d081f0caf6..d194e3dbbf47c 100644 --- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java +++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerFactory.java @@ -21,10 +21,12 @@ public class SchedulerFactory { private final SchedulerType schedulerType; + private final SchedulerManager schedulerManager; - public SchedulerFactory(SchedulerType schedulerType) + public SchedulerFactory(SchedulerType schedulerType, SchedulerManager schedulerManager) { this.schedulerType = requireNonNull(schedulerType, "schedulerType is null"); + this.schedulerManager = requireNonNull(schedulerManager, "schedulerManager is null"); } public Scheduler create() @@ -38,6 +40,8 @@ public Scheduler create() return new UserHashScheduler(); case ROUND_ROBIN: return new RoundRobinScheduler(); + case CUSTOM_PLUGIN_SCHEDULER: + return new CustomPluginScheduler(schedulerManager); } throw new PrestoException(NOT_SUPPORTED, "Unsupported router scheduler type " + schedulerType); } diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java new file mode 100644 index 0000000000000..a734920767e6e --- /dev/null +++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerManager.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.router.scheduler; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.router.Scheduler; +import com.facebook.presto.spi.router.SchedulerFactory; +import com.google.common.collect.ImmutableMap; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static com.facebook.presto.util.PropertiesUtil.loadProperties; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.requireNonNull; + +public class SchedulerManager +{ + private static final Logger log = Logger.get(SchedulerManager.class); + + private static final File CONFIG_FILE = new File("etc/router-config/router-scheduler.properties"); + private static final String NAME_PROPERTY = "router-scheduler.name"; + + private final AtomicBoolean required = new AtomicBoolean(); + private final Map factories = new ConcurrentHashMap<>(); + private final AtomicReference scheduler = new AtomicReference<>(); + private final AtomicReference schedulerName = new AtomicReference<>(""); + + public void setRequired() + { + required.set(true); + } + + public void addSchedulerFactory(SchedulerFactory factory) + { + checkArgument(factories.putIfAbsent(factory.getName(), factory) == null, + "Presto Router Scheduler '%s' is already registered", factory.getName()); + } + + public void loadScheduler() + throws Exception + { + if (!required.get()) { + return; + } + + File configFileLocation = CONFIG_FILE.getAbsoluteFile(); + Map properties = new HashMap<>(loadProperties(configFileLocation)); + + String name = properties.remove(NAME_PROPERTY); + checkArgument(!isNullOrEmpty(name), + "Router scheduler configuration %s does not contain %s", configFileLocation, NAME_PROPERTY); + + if (!schedulerName.get().equalsIgnoreCase(name)) { + log.info("-- Loading Presto Router Scheduler --"); + SchedulerFactory factory = factories.get(name); + checkState(factory != null, "Presto Router Scheduler %s is not registered", name); + Scheduler scheduler = factory.create(ImmutableMap.copyOf(properties)); + this.scheduler.set(requireNonNull(scheduler, "scheduler is null")); + this.schedulerName.set(name); + log.info("-- Loaded Presto Router Scheduler %s --", name); + } + } + + public Scheduler getScheduler() + { + checkState(scheduler.get() != null, "scheduler was not loaded"); + return scheduler.get(); + } +} diff --git a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java index cef91386ebb66..431ba865b7d32 100644 --- a/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java +++ b/presto-router/src/main/java/com/facebook/presto/router/scheduler/SchedulerType.java @@ -19,4 +19,5 @@ public enum SchedulerType ROUND_ROBIN, USER_HASH, WEIGHTED_RANDOM_CHOICE, + CUSTOM_PLUGIN_SCHEDULER } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java new file mode 100644 index 0000000000000..9c2d5abfc70c8 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/RouterPlugin.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +import com.facebook.presto.spi.router.SchedulerFactory; + +import static java.util.Collections.emptyList; + +public interface RouterPlugin +{ + default Iterable getSchedulerFactories() + { + return emptyList(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java new file mode 100644 index 0000000000000..8b79fbb57478a --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/ClusterInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.router; + +public interface ClusterInfo +{ + default long getRunningQueries() + { + return 0; + } + + default long getBlockedQueries() + { + return 0; + } + + default long getQueuedQueries() + { + return 0; + } + + default long getActiveWorkers() + { + return 0; + } + + default long getRunningDrivers() + { + return 0; + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java new file mode 100644 index 0000000000000..489e624239d11 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/Scheduler.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.router; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public interface Scheduler +{ + /** + * Schedules a query from a user to a concrete candidate. Returns the + * URI of this candidate. + */ + Optional getDestination(String user, String query); + + /** + * Sets the candidates with the list of URIs for scheduling. + */ + void setCandidates(List candidates); + + /** + * Sets remote cluster infos for each cluster. + */ + default void setClusterInfos(Map clusterInfos) {} +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java b/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java new file mode 100644 index 0000000000000..16c48dbb5fe6c --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/router/SchedulerFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.router; + +import java.util.Map; + +public interface SchedulerFactory +{ + String getName(); + + Scheduler create(Map config); +}