Skip to content

Commit

Permalink
Integrate session property SPI with Presto engine
Browse files Browse the repository at this point in the history
Co-authored-by: Abe Varghese Kodiyan <[email protected]>
Co-authored-by: Joe Abraham <[email protected]>
Co-authored-by: Deepthy Davis <[email protected]>
  • Loading branch information
4 people authored and tdcmeehan committed Oct 28, 2024
1 parent 524fbea commit ebbfdac
Show file tree
Hide file tree
Showing 18 changed files with 650 additions and 3 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@
<module>presto-singlestore</module>
<module>presto-hana</module>
<module>presto-openapi</module>
<module>presto-native-sidecar-plugin</module>
</modules>

<dependencyManagement>
Expand Down Expand Up @@ -457,6 +458,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-session-property-providers</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-bigquery</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.TinyintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.VarcharType;
Expand Down Expand Up @@ -137,11 +138,11 @@ public static SessionPropertyManager createTestingSessionPropertyManager(
Optional.empty());
}

public void loadSessionPropertyProvider(String sessionPropertyProviderName)
public void loadSessionPropertyProvider(String sessionPropertyProviderName, Optional<TypeManager> typeManager, Optional<NodeManager> nodeManager)
{
WorkerSessionPropertyProviderFactory factory = workerSessionPropertyProviderFactories.get(sessionPropertyProviderName);
checkState(factory != null, "No factory for session property provider : " + sessionPropertyProviderName);
WorkerSessionPropertyProvider sessionPropertyProvider = factory.create(new SessionPropertyContext(functionAndTypeManager, nodeManager));
WorkerSessionPropertyProvider sessionPropertyProvider = factory.create(new SessionPropertyContext(typeManager, nodeManager));
if (workerSessionPropertyProviders.putIfAbsent(sessionPropertyProviderName, sessionPropertyProvider) != null) {
throw new IllegalArgumentException("System session property provider is already registered for property provider : " + sessionPropertyProviderName);
}
Expand All @@ -150,7 +151,14 @@ public void loadSessionPropertyProvider(String sessionPropertyProviderName)
public void loadSessionPropertyProviders()
{
for (String sessionPropertyProviderName : workerSessionPropertyProviderFactories.keySet()) {
loadSessionPropertyProvider(sessionPropertyProviderName);
loadSessionPropertyProvider(sessionPropertyProviderName, functionAndTypeManager, nodeManager);
}
}

public void addSessionPropertyProviderFactory(WorkerSessionPropertyProviderFactory factory)
{
if (workerSessionPropertyProviderFactories.putIfAbsent(factory.getName(), factory) != null) {
throw new IllegalArgumentException(format("System Session property provider factory" + factory.getName() + "is already registered"));
}
}

Expand Down Expand Up @@ -367,6 +375,9 @@ public static String serializeSessionProperty(Type type, Object value)
if (VarcharType.VARCHAR.equals(type)) {
return value.toString();
}
if (TinyintType.TINYINT.equals(type)) {
return value.toString();
}
if (type instanceof ArrayType || type instanceof MapType) {
return getJsonCodecForType(type).toJson(value);
}
Expand All @@ -393,6 +404,9 @@ private static Object deserializeSessionProperty(Type type, String value)
if (DoubleType.DOUBLE.equals(type)) {
return Double.valueOf(value);
}
if (TinyintType.TINYINT.equals(type)) {
return Byte.valueOf(value);
}
if (type instanceof ArrayType || type instanceof MapType) {
return getJsonCodecForType(type).fromJson(value);
}
Expand All @@ -416,6 +430,9 @@ private static <T> JsonCodec<T> getJsonCodecForType(Type type)
if (DoubleType.DOUBLE.equals(type)) {
return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Double.class);
}
if (TinyintType.TINYINT.equals(type)) {
return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Byte.class);
}
if (type instanceof ArrayType) {
Type elementType = ((ArrayType) type).getElementType();
return (JsonCodec<T>) JSON_CODEC_FACTORY.listJsonCodec(getJsonCodecForType(elementType));
Expand Down Expand Up @@ -445,6 +462,9 @@ private static Class<?> getMapKeyType(Type type)
if (DoubleType.DOUBLE.equals(type)) {
return Double.class;
}
if (TinyintType.TINYINT.equals(type)) {
return Byte.class;
}
throw new PrestoException(INVALID_SESSION_PROPERTY, format("Session property map key type %s is not supported", type));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.facebook.presto.spi.security.PasswordAuthenticatorFactory;
import com.facebook.presto.spi.security.SystemAccessControlFactory;
import com.facebook.presto.spi.session.SessionPropertyConfigurationManagerFactory;
import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.storage.TempStorageFactory;
import com.facebook.presto.spi.tracing.TracerProvider;
Expand Down Expand Up @@ -366,6 +367,11 @@ public void installCoordinatorPlugin(CoordinatorPlugin plugin)
log.info("Registering function namespace manager %s", functionNamespaceManagerFactory.getName());
metadata.getFunctionAndTypeManager().addFunctionNamespaceFactory(functionNamespaceManagerFactory);
}

for (WorkerSessionPropertyProviderFactory providerFactory : plugin.getWorkerSessionPropertyProviderFactories()) {
log.info("Registering system session property provider factory %s", providerFactory.getName());
metadata.getSessionPropertyManager().addSessionPropertyProviderFactory(providerFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.facebook.presto.metadata.InternalNode;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.resourcemanager.ResourceManagerClusterStateProvider;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.server.GracefulShutdownHandler;
Expand All @@ -61,6 +62,7 @@
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -175,6 +177,7 @@ public class TestingPrestoServer
private final ServerInfoResource serverInfoResource;
private final ResourceManagerClusterStateProvider clusterStateProvider;
private final PlanCheckerProviderManager planCheckerProviderManager;
private final NodeManager pluginNodeManager;

public static class TestShutdownAction
implements ShutdownAction
Expand Down Expand Up @@ -440,6 +443,7 @@ else if (catalogServer) {
announcer = injector.getInstance(Announcer.class);
requestBlocker = injector.getInstance(RequestBlocker.class);
serverInfoResource = injector.getInstance(ServerInfoResource.class);
pluginNodeManager = injector.getInstance(PluginNodeManager.class);

// Announce Thrift server address
DriftServer driftServer = injector.getInstance(DriftServer.class);
Expand Down Expand Up @@ -510,6 +514,7 @@ public PluginManager getPluginManager()
{
return pluginManager;
}

public void installPlugin(Plugin plugin)
{
pluginManager.installPlugin(plugin);
Expand Down Expand Up @@ -647,6 +652,11 @@ public InternalNodeManager getNodeManager()
return nodeManager;
}

public NodeManager getPluginNodeManager()
{
return pluginNodeManager;
}

public NodePartitioningManager getNodePartitioningManager()
{
return nodePartitioningManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ default void installCoordinatorPlugin(CoordinatorPlugin plugin)

void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map<String, String> properties);

default void loadSessionPropertyProvider(String sessionPropertyProviderName)
{
throw new UnsupportedOperationException();
}

Lock getExclusiveLock();

class MaterializedResultWithPlan
Expand Down
107 changes: 107 additions & 0 deletions presto-native-sidecar-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.290-SNAPSHOT</version>
</parent>

<artifactId>presto-native-sidecar-plugin</artifactId>
<description>Presto - Native Sidecar Plugin</description>
<packaging>presto-plugin</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-common</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-testng-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sidecar;

import com.facebook.presto.sidecar.sessionpropertyproviders.NativeSystemSessionPropertyProviderFactory;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory;
import com.google.common.collect.ImmutableList;

public class NativeSidecarPlugin
implements CoordinatorPlugin
{
@Override
public Iterable<WorkerSessionPropertyProviderFactory> getWorkerSessionPropertyProviderFactories()
{
return ImmutableList.of(new NativeSystemSessionPropertyProviderFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sidecar;

import com.facebook.presto.spi.Plugin;

/**
* Todo: Remove this class when support for CoordinatorPlugin is added in presto-maven-plugin.
*/
public class NoOpPlugin
implements Plugin
{
}
Loading

0 comments on commit ebbfdac

Please sign in to comment.