Skip to content

Commit

Permalink
Add ExpressionOptimizerManager
Browse files Browse the repository at this point in the history
  • Loading branch information
tdcmeehan committed Jan 29, 2025
1 parent d477ea1 commit 51ae5a5
Show file tree
Hide file tree
Showing 30 changed files with 510 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ public final class SystemSessionProperties
public static final String INLINE_PROJECTIONS_ON_VALUES = "inline_projections_on_values";
public static final String INCLUDE_VALUES_NODE_IN_CONNECTOR_OPTIMIZER = "include_values_node_in_connector_optimizer";
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1852,7 +1853,12 @@ public SystemSessionProperties(
booleanProperty(NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED,
"Enable automatic scaling of writer threads",
featuresConfig.isNativeExecutionScaleWritersThreadsEnabled(),
!featuresConfig.isNativeExecutionEnabled()));
!featuresConfig.isNativeExecutionEnabled()),
stringProperty(
EXPRESSION_OPTIMIZER_NAME,
"Configure which expression optimizer to use",
featuresConfig.getExpressionOptimizerName(),
false));
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down Expand Up @@ -3153,4 +3159,9 @@ public static boolean isNativeExecutionScaleWritersThreadsEnabled(Session sessio
{
return session.getSystemProperty(NATIVE_EXECUTION_SCALE_WRITER_THREADS_ENABLED, Boolean.class);
}

public static String getExpressionOptimizerName(Session session)
{
return session.getSystemProperty(EXPRESSION_OPTIMIZER_NAME, String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import com.facebook.presto.spi.security.SystemAccessControlFactory;
import com.facebook.presto.spi.session.SessionPropertyConfigurationManagerFactory;
import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory;
import com.facebook.presto.spi.sql.planner.ExpressionOptimizerFactory;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.spi.storage.TempStorageFactory;
import com.facebook.presto.spi.tracing.TracerProvider;
import com.facebook.presto.spi.ttl.ClusterTtlProviderFactory;
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.tracing.TracerProviderManager;
Expand Down Expand Up @@ -141,6 +143,7 @@ public class PluginManager
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private final ClientRequestFilterManager clientRequestFilterManager;
private final PlanCheckerProviderManager planCheckerProviderManager;
private final ExpressionOptimizerManager expressionOptimizerManager;

@Inject
public PluginManager(
Expand All @@ -165,7 +168,8 @@ public PluginManager(
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager,
ClientRequestFilterManager clientRequestFilterManager,
PlanCheckerProviderManager planCheckerProviderManager)
PlanCheckerProviderManager planCheckerProviderManager,
ExpressionOptimizerManager expressionOptimizerManager)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -200,6 +204,7 @@ public PluginManager(
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
this.clientRequestFilterManager = requireNonNull(clientRequestFilterManager, "clientRequestFilterManager is null");
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
this.expressionOptimizerManager = requireNonNull(expressionOptimizerManager, "expressionManager is null");
}

public void loadPlugins()
Expand Down Expand Up @@ -392,6 +397,11 @@ public void installCoordinatorPlugin(CoordinatorPlugin plugin)
log.info("Registering plan checker provider factory %s", planCheckerProviderFactory.getName());
planCheckerProviderManager.addPlanCheckerProviderFactory(planCheckerProviderFactory);
}

for (ExpressionOptimizerFactory expressionOptimizerFactory : plugin.getExpressionOptimizerFactories()) {
log.info("Registering expression optimizer factory %s", expressionOptimizerFactory.getName());
expressionOptimizerManager.addExpressionOptimizerFactory(expressionOptimizerFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.facebook.presto.server.security.PrestoAuthenticatorManager;
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.sanity.PlanCheckerProviderManager;
import com.facebook.presto.storage.TempStorageManager;
Expand Down Expand Up @@ -196,6 +197,8 @@ public void run()
planCheckerProviderManager.loadPlanCheckerProviders(pluginNodeManager);

injector.getInstance(ClientRequestFilterManager.class).loadClientRequestFilters();
injector.getInstance(ExpressionOptimizerManager.class).loadExpressionOptimizerFactories();

startAssociatedProcesses(injector);

injector.getInstance(Announcer.class).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
import com.facebook.presto.sql.analyzer.MetadataExtractorMBean;
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.analyzer.QueryPreparerProviderManager;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.gen.ExpressionCompiler;
import com.facebook.presto.sql.gen.JoinCompiler;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
Expand Down Expand Up @@ -359,6 +360,9 @@ else if (serverConfig.isCoordinator()) {
binder.bind(SystemSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(SessionPropertyDefaults.class).in(Scopes.SINGLETON);

// expression manager
binder.bind(ExpressionOptimizerManager.class).in(Scopes.SINGLETON);

// schema properties
binder.bind(SchemaPropertyManager.class).in(Scopes.SINGLETON);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.expressions.ExpressionOptimizerManager;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.facebook.presto.sql.planner.ConnectorPlanOptimizerManager;
Expand Down Expand Up @@ -171,6 +172,7 @@ public class TestingPrestoServer
private final TaskManager taskManager;
private final GracefulShutdownHandler gracefulShutdownHandler;
private final ShutdownAction shutdownAction;
private final ExpressionOptimizerManager expressionManager;
private final RequestBlocker requestBlocker;
private final boolean resourceManager;
private final boolean catalogServer;
Expand Down Expand Up @@ -373,6 +375,7 @@ public TestingPrestoServer(
procedureTester = injector.getInstance(ProcedureTester.class);
splitManager = injector.getInstance(SplitManager.class);
pageSourceManager = injector.getInstance(PageSourceManager.class);
expressionManager = injector.getInstance(ExpressionOptimizerManager.class);
if (coordinator) {
dispatchManager = injector.getInstance(DispatchManager.class);
queryManager = injector.getInstance(QueryManager.class);
Expand All @@ -390,6 +393,7 @@ public TestingPrestoServer(
eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class));
clusterStateProvider = null;
planCheckerProviderManager = injector.getInstance(PlanCheckerProviderManager.class);
expressionManager.loadExpressionOptimizerFactories();
}
else if (resourceManager) {
dispatchManager = null;
Expand Down Expand Up @@ -710,6 +714,11 @@ public ShutdownAction getShutdownAction()
return shutdownAction;
}

public ExpressionOptimizerManager getExpressionManager()
{
return expressionManager;
}

public boolean isCoordinator()
{
return coordinator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinNotNullInferenceStrategy.NONE;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.TaskSpillingStrategy.ORDER_BY_CREATE_TIME;
import static com.facebook.presto.sql.expressions.ExpressionOptimizerManager.DEFAULT_EXPRESSION_OPTIMIZER_NAME;
import static com.facebook.presto.sql.tree.CreateView.Security.DEFINER;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
Expand Down Expand Up @@ -293,6 +294,7 @@ public class FeaturesConfig
private boolean prestoSparkExecutionEnvironment;
private boolean singleNodeExecutionEnabled;
private boolean nativeExecutionScaleWritersThreadsEnabled;
private String expressionOptimizerName = DEFAULT_EXPRESSION_OPTIMIZER_NAME;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2915,4 +2917,17 @@ public FeaturesConfig setNativeExecutionScaleWritersThreadsEnabled(boolean nativ
this.nativeExecutionScaleWritersThreadsEnabled = nativeExecutionScaleWritersThreadsEnabled;
return this;
}

public String getExpressionOptimizerName()
{
return expressionOptimizerName;
}

@Config("expression-optimizer-name")
@ConfigDescription("Set the expression optimizer name for parsing and analyzing.")
public FeaturesConfig setExpressionOptimizerName(String expressionOptimizerName)
{
this.expressionOptimizerName = expressionOptimizerName;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.expressions;

import com.facebook.presto.FullConnectorSession;
import com.facebook.presto.Session;
import com.facebook.presto.metadata.FunctionAndTypeManager;
import com.facebook.presto.nodeManager.PluginNodeManager;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.relation.ExpressionOptimizer;
import com.facebook.presto.spi.sql.planner.ExpressionOptimizerContext;
import com.facebook.presto.spi.sql.planner.ExpressionOptimizerFactory;
import com.facebook.presto.sql.relational.FunctionResolution;
import com.facebook.presto.sql.relational.RowExpressionOptimizer;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.facebook.presto.SystemSessionProperties.getExpressionOptimizerName;
import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.io.Files.getNameWithoutExtension;
import static java.util.Objects.requireNonNull;

public class ExpressionOptimizerManager
{
public static final String DEFAULT_EXPRESSION_OPTIMIZER_NAME = "default";
private static final File EXPRESSION_MANAGER_CONFIGURATION_DIRECTORY = new File("etc/expression-manager/");
private static final String EXPRESSION_MANAGER_FACTORY_NAME = "expression-manager-factory.name";

private final NodeManager nodeManager;
private final FunctionAndTypeManager functionAndTypeManager;
private final FunctionResolution functionResolution;
private final File configurationDirectory;

private final Map<String, ExpressionOptimizerFactory> expressionOptimizerFactories = new ConcurrentHashMap<>();
private final Map<String, ExpressionOptimizer> expressionOptimizers = new ConcurrentHashMap<>();

@Inject
public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager)
{
this(nodeManager, functionAndTypeManager, EXPRESSION_MANAGER_CONFIGURATION_DIRECTORY);
}

public ExpressionOptimizerManager(PluginNodeManager nodeManager, FunctionAndTypeManager functionAndTypeManager, File configurationDirectory)
{
requireNonNull(nodeManager, "nodeManager is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
this.functionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver());
this.configurationDirectory = requireNonNull(configurationDirectory, "configurationDirectory is null");
expressionOptimizers.put(DEFAULT_EXPRESSION_OPTIMIZER_NAME, new RowExpressionOptimizer(functionAndTypeManager));
}

public void loadExpressionOptimizerFactories()
{
try {
for (File file : listFiles(configurationDirectory)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
loadExpressionOptimizerFactory(file);
}
}
}
catch (IOException e) {
throw new UncheckedIOException("Failed to load expression manager configuration", e);
}
}

private void loadExpressionOptimizerFactory(File configurationFile)
throws IOException
{
String name = getNameWithoutExtension(configurationFile.getName());
checkArgument(!isNullOrEmpty(name), "File name is empty, full path: %s", configurationFile.getAbsolutePath());
checkArgument(!name.equals(DEFAULT_EXPRESSION_OPTIMIZER_NAME), "Cannot name an expression optimizer instance %s", DEFAULT_EXPRESSION_OPTIMIZER_NAME);

Map<String, String> properties = new HashMap<>(loadProperties(configurationFile));
String factoryName = properties.remove(EXPRESSION_MANAGER_FACTORY_NAME);
checkArgument(!isNullOrEmpty(factoryName), "%s does not contain %s", configurationFile, EXPRESSION_MANAGER_FACTORY_NAME);
checkArgument(expressionOptimizerFactories.containsKey(factoryName),
"ExpressionOptimizerFactory %s is not registered, registered factories: ", factoryName, expressionOptimizerFactories.keySet());

ExpressionOptimizer optimizer = expressionOptimizerFactories.get(factoryName).createOptimizer(
properties,
new ExpressionOptimizerContext(nodeManager, functionAndTypeManager, functionResolution));
expressionOptimizers.put(name, optimizer);
}

public void addExpressionOptimizerFactory(ExpressionOptimizerFactory expressionOptimizerFactory)
{
String name = expressionOptimizerFactory.getName();
checkArgument(
expressionOptimizerFactories.putIfAbsent(name, expressionOptimizerFactory) == null,
"ExpressionOptimizerFactory %s is already registered", name);
}

public ExpressionOptimizer getExpressionOptimizer(ConnectorSession connectorSession)
{
// TODO: Remove this check once we have a more appropriate abstraction for session properties retrieved from plugins
checkArgument(connectorSession instanceof FullConnectorSession, "connectorSession is not an instance of FullConnectorSession");
Session session = ((FullConnectorSession) connectorSession).getSession();
String expressionOptimizerName = getExpressionOptimizerName(session);
checkArgument(expressionOptimizers.containsKey(expressionOptimizerName), "ExpressionOptimizer '%s' is not registered", expressionOptimizerName);
return expressionOptimizers.get(expressionOptimizerName);
}

private static List<File> listFiles(File directory)
{
if (directory.isDirectory()) {
File[] files = directory.listFiles();
if (files != null) {
return ImmutableList.copyOf(files);
}
}
return ImmutableList.of();
}
}
Loading

0 comments on commit 51ae5a5

Please sign in to comment.