Skip to content

Commit

Permalink
[native] Add row expression optimizer
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Feb 7, 2025
1 parent a6d82fe commit 66b5c37
Show file tree
Hide file tree
Showing 27 changed files with 3,922 additions and 53 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ target_link_libraries(
presto_http
presto_operators
presto_velox_conversion
presto_expression_optimizer
velox_abfs
velox_aggregates
velox_caching
Expand Down
28 changes: 28 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,13 @@ void PrestoServer::registerSidecarEndpoints() {
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
});
httpServer_->registerPost(
"/v1/expressions",
[&](proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
optimizeExpressions(*message, body, downstream);
});
httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
Expand Down Expand Up @@ -1599,4 +1606,25 @@ protocol::NodeStatus PrestoServer::fetchNodeStatus() {
return nodeStatus;
}

void PrestoServer::optimizeExpressions(
const proxygen::HTTPMessage& message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
json::array_t inputRowExpressions =
json::parse(util::extractMessageBody(body));
auto rowExpressionOptimizer =
std::make_unique<expression::RowExpressionOptimizer>(
nativeWorkerPool_.get());
auto result = rowExpressionOptimizer->optimize(
message.getHeaders(), inputRowExpressions);
if (result.second) {
VELOX_CHECK(
result.first.is_array(),
"The output json is not an array of RowExpressions");
http::sendOkResponse(downstream, result.first);
} else {
http::sendErrorResponse(downstream, result.first);
}
}

} // namespace facebook::presto
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "presto_cpp/main/PeriodicHeartbeatManager.h"
#include "presto_cpp/main/PrestoExchangeSource.h"
#include "presto_cpp/main/PrestoServerOperations.h"
#include "presto_cpp/main/types/RowExpressionOptimizer.h"
#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/memory/MemoryAllocator.h"
Expand Down Expand Up @@ -217,6 +218,11 @@ class PrestoServer {

protocol::NodeStatus fetchNodeStatus();

void optimizeExpressions(
const proxygen::HTTPMessage& message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream);

void populateMemAndCPUInfo();

// Periodically yield tasks if there are tasks queued.
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ add_library(presto_velox_conversion OBJECT VeloxPlanConversion.cpp)

target_link_libraries(presto_velox_conversion velox_type)

add_library(presto_expression_optimizer RowExpressionConverter.cpp
RowExpressionOptimizer.cpp)

target_link_libraries(presto_expression_optimizer presto_type_converter
presto_types presto_protocol)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
116 changes: 63 additions & 53 deletions presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include <boost/algorithm/string/case_conv.hpp>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/presto_protocol/Base64Util.h"
#include "velox/common/base/Exceptions.h"
#include "velox/functions/prestosql/types/JsonType.h"
Expand All @@ -34,59 +33,9 @@ std::string toJsonString(const T& value) {
}

std::string mapScalarFunction(const std::string& name) {
static const std::string prestoDefaultNamespacePrefix =
SystemConfig::instance()->prestoDefaultNamespacePrefix();
static const std::unordered_map<std::string, std::string> kFunctionNames = {
// Operator overrides: com.facebook.presto.common.function.OperatorType
{"presto.default.$operator$add",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "plus")},
{"presto.default.$operator$between",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "between")},
{"presto.default.$operator$divide",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "divide")},
{"presto.default.$operator$equal",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "eq")},
{"presto.default.$operator$greater_than",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "gt")},
{"presto.default.$operator$greater_than_or_equal",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "gte")},
{"presto.default.$operator$is_distinct_from",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "distinct_from")},
{"presto.default.$operator$less_than",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "lt")},
{"presto.default.$operator$less_than_or_equal",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "lte")},
{"presto.default.$operator$modulus",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "mod")},
{"presto.default.$operator$multiply",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "multiply")},
{"presto.default.$operator$negation",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "negate")},
{"presto.default.$operator$not_equal",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "neq")},
{"presto.default.$operator$subtract",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "minus")},
{"presto.default.$operator$subscript",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "subscript")},
{"presto.default.$operator$xx_hash_64",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "xxhash64_internal")},
{"presto.default.combine_hash",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "combine_hash_internal")},
// Special form function overrides.
{"presto.default.in", "in"},
};

std::string lowerCaseName = boost::to_lower_copy(name);

auto it = kFunctionNames.find(lowerCaseName);
if (it != kFunctionNames.end()) {
return it->second;
if (prestoOperatorMap().find(lowerCaseName) != prestoOperatorMap().end()) {
return prestoOperatorMap().at(lowerCaseName);
}

return lowerCaseName;
Expand Down Expand Up @@ -385,6 +334,67 @@ std::optional<TypedExprPtr> tryConvertLiteralArray(
}
} // namespace

const std::unordered_map<std::string, std::string> prestoOperatorMap() {
static const std::string prestoDefaultNamespacePrefix =
SystemConfig::instance()->prestoDefaultNamespacePrefix();
static const std::unordered_map<std::string, std::string> kPrestoOperatorMap =
{
// Operator overrides:
// com.facebook.presto.common.function.OperatorType
{"presto.default.$operator$add",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "plus")},
{"presto.default.$operator$between",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "between")},
{"presto.default.$operator$divide",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "divide")},
{"presto.default.$operator$equal",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "eq")},
{"presto.default.$operator$greater_than",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "gt")},
{"presto.default.$operator$greater_than_or_equal",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "gte")},
{"presto.default.$operator$is_distinct_from",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "distinct_from")},
{"presto.default.$operator$less_than",
util::addDefaultNamespacePrefix(prestoDefaultNamespacePrefix, "lt")},
{"presto.default.$operator$less_than_or_equal",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "lte")},
{"presto.default.$operator$modulus",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "mod")},
{"presto.default.$operator$multiply",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "multiply")},
{"presto.default.$operator$negation",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "negate")},
{"presto.default.$operator$not_equal",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "neq")},
{"presto.default.$operator$subtract",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "minus")},
{"presto.default.$operator$subscript",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "subscript")},
{"presto.default.$operator$xx_hash_64",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "xxhash64_internal")},
{"presto.default.combine_hash",
util::addDefaultNamespacePrefix(
prestoDefaultNamespacePrefix, "combine_hash_internal")},
// Special form function overrides.
{"presto.default.in", "in"},
};
return kPrestoOperatorMap;
}

std::optional<TypedExprPtr> VeloxExprConverter::tryConvertDate(
const protocol::CallExpression& pexpr) const {
static const std::string prestoDefaultNamespacePrefix =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

namespace facebook::presto {

const std::unordered_map<std::string, std::string> prestoOperatorMap();

class VeloxExprConverter {
public:
VeloxExprConverter(velox::memory::MemoryPool* pool, TypeParser* typeParser)
Expand Down
Loading

0 comments on commit 66b5c37

Please sign in to comment.