Skip to content

Commit

Permalink
Metrics based custom scheduler plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jp-sivaprasad committed Jan 27, 2025
1 parent 901da6d commit 4e47db2
Show file tree
Hide file tree
Showing 27 changed files with 996 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import static java.util.Objects.requireNonNull;

class PluginClassLoader
public class PluginClassLoader
extends URLClassLoader
{
private static final ClassLoader PLATFORM_CLASS_LOADER = findPlatformClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static java.nio.file.Files.walkFileTree;

// This is a hack for development and does not support nested classes.
final class PluginDiscovery
public final class PluginDiscovery
{
private static final String CLASS_FILE_SUFFIX = ".class";

Expand Down
17 changes: 17 additions & 0 deletions presto-router-custom-scheduler/README.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Presto Router Custom Scheduler Plugin
This package implements third party custom schedulers to be used by Presto router

## Adding the custom scheduler plugin to presto router
The custom plugin jar that is built is placed in the presto-router/plugin/custom-scheduler directory,
to be picked up by the presto-router
The scheduler name has to be set to CUSTOM_PLUGIN_SCHEDULER in the router-config.json
"scheduler": "CUSTOM_PLUGIN_SCHEDULER"
The name of the custom scheduler factory needs to be set in the property file router-scheduler.properties in presto-router/etc/router-config

## Main Classes:
RouterSchedulerPlugin - Custom Scheduler Plugin class to be loaded by the Router plugin manager.
This class implements the interface com.facebook.presto.spi.RouterPlugin.
MetricsBasedSchedulerFactory - Factory for creating specific custom scheduler
This class implements the interface com.facebook.presto.spi.SchedulerFactory
MetricsBasedScheduler - Custom scheduler implementing the scheduling logic for clusters. More similar classes can be added implementing specific custom scheduling logic.
This class implements the interface com.facebook.presto.spi.router.Scheduler
90 changes: 90 additions & 0 deletions presto-router-custom-scheduler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.facebook.presto.router</groupId>
<artifactId>scheduler</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>scheduler</name>
<description>SPI plugin project for presto router custom scheduler</description>

<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.286</version>
</dependency>
<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>log</artifactId>
<version>0.215</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>http-client</artifactId>
<version>0.194</version>
</dependency>

<!-- test dependencies-->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.10.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.9.1</version> <!-- Use the latest stable version -->
<scope>test</scope>
</dependency>
</dependencies>

<!-- Build configurations -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.facebook.presto.router.scheduler.RouterSchedulerPlugin</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.facebook.presto.router.scheduler;

public class CustomSchedulerConstants {
public static final String METRICS_BASED = "metricsBased";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.router.scheduler;

import com.facebook.airlift.log.Logger;

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.stream.Collectors;

import com.facebook.presto.spi.router.ClusterInfo;
import com.facebook.presto.spi.router.Scheduler;

//Metrics based scheduler uses coordinator and/or worker metrics for scheduling decisions
public class MetricsBasedScheduler
implements Scheduler {
private static final Logger log = Logger.get(MetricsBasedScheduler.class);
Map<URI, ClusterInfo> clusterInfos;

//Min heap based priority queue. Cluster with lowest number of queries will be on top of the queue
PriorityQueue<ClusterQueryInfo> clusterQueue = new PriorityQueue<ClusterQueryInfo>((x, y) -> (int) (x.totalQueries - y.totalQueries));

private List<URI> candidates;

private class ClusterQueryInfo {
URI clusterUri;
long runningQueries, queuedQueries, totalQueries;

ClusterQueryInfo(URI clusterUri, long runningQueries, long queuedQueries) {
this.clusterUri = clusterUri;
this.runningQueries = runningQueries;
this.queuedQueries = queuedQueries;
totalQueries = (long) (runningQueries + queuedQueries);
log.info("Cluster URI : " + clusterUri + ", runningQueries : " + runningQueries + ", queuedQueries : " + queuedQueries + ", totalQueries : " + totalQueries);
}
}

@Override
public Optional<URI> getDestination(String user, String query) {
try {
if (clusterInfos != null && clusterInfos.size() > 0) {
clusterQueue.clear();
clusterQueue.addAll(clusterInfos.keySet().stream()
.map(uri -> new ClusterQueryInfo(uri, clusterInfos.get(uri).getRunningQueries(), clusterInfos.get(uri).getQueuedQueries()))
.collect(Collectors.toList()));
return Optional.of(clusterQueue.poll().clusterUri);
}
return Optional.empty();
} catch (IllegalArgumentException e) {
log.warn(e, "Error getting destination for user " + user);
return Optional.empty();
}
}

@Override
public void setCandidates(List<URI> candidates) {
this.candidates = candidates;
}

@Override
public void setClusterInfos(Map<URI, ClusterInfo> clusterInfos) {
this.clusterInfos = clusterInfos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.facebook.presto.router.scheduler;

import java.util.Map;

import com.facebook.presto.spi.router.Scheduler;
import com.facebook.presto.spi.router.SchedulerFactory;

public class MetricsBasedSchedulerFactory implements SchedulerFactory {

@Override
public String getName() {
return CustomSchedulerConstants.METRICS_BASED;
}

@Override
public Scheduler create(Map<String, String> config) {
return new MetricsBasedScheduler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.facebook.presto.router.scheduler;

import com.facebook.presto.spi.RouterPlugin;
import com.facebook.presto.spi.router.SchedulerFactory;

import java.util.ArrayList;
import java.util.List;

public class RouterSchedulerPlugin implements RouterPlugin {

@Override
public Iterable<SchedulerFactory> getSchedulerFactories()
{
List<SchedulerFactory> schedulerFactories = new ArrayList<>();
schedulerFactories.add(new MetricsBasedSchedulerFactory());
return schedulerFactories;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.facebook.presto.router.scheduler.RouterSchedulerPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.router.scheduler;

import com.facebook.presto.spi.router.ClusterInfo;
import com.facebook.presto.spi.router.Scheduler;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import static org.testng.Assert.assertEquals;

public class TestCustomScheduler {
Map<URI, ClusterInfo> clusterInfos = new HashMap<>();

static class MockRemoteClusterInfo implements ClusterInfo {
long runningQueries, queuedQueries;

MockRemoteClusterInfo(long runningQueries, long queuedQueries) {
this.runningQueries = runningQueries;
this.queuedQueries = queuedQueries;
}

@Override
public long getRunningQueries() {
return runningQueries;
}

@Override
public long getQueuedQueries() {
return queuedQueries;
}
}

@Test
public void testMetricsBasedScheduler()
throws Exception {
Scheduler scheduler = new MetricsBasedScheduler();

URI uri1 = new URI("192.168.0.1");
URI uri2 = new URI("192.168.0.2");
URI uri3 = new URI("192.168.0.3");

clusterInfos.put(uri1, new MockRemoteClusterInfo(10, 10));
clusterInfos.put(uri2, new MockRemoteClusterInfo(20, 20));
clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30));
scheduler.setClusterInfos(clusterInfos);
URI target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
assertEquals(target.getPath(), "192.168.0.1");

clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20));
clusterInfos.put(uri2, new MockRemoteClusterInfo(10, 10));
clusterInfos.put(uri3, new MockRemoteClusterInfo(30, 30));
scheduler.setClusterInfos(clusterInfos);
target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
assertEquals(target.getPath(), "192.168.0.2");

clusterInfos.put(uri1, new MockRemoteClusterInfo(20, 20));
clusterInfos.put(uri2, new MockRemoteClusterInfo(30, 30));
clusterInfos.put(uri3, new MockRemoteClusterInfo(10, 10));
scheduler.setClusterInfos(clusterInfos);
target = scheduler.getDestination("test", null).orElse(new URI("invalid"));
assertEquals(target.getPath(), "192.168.0.3");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
router-scheduler.name=metricsBased
42 changes: 38 additions & 4 deletions presto-router/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,50 @@
<artifactId>jmxutils</artifactId>
</dependency>

<!-- test dependencies-->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
<groupId>io.airlift.resolver</groupId>
<artifactId>resolver</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-main</artifactId>
<exclusions>
<exclusion>
<groupId>software.amazon.ion</groupId>
<artifactId>ion-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-api</artifactId>
</dependency>

<!-- test dependencies-->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Loading

0 comments on commit 4e47db2

Please sign in to comment.