Skip to content

Commit c2ca02c

Browse files
Add support for REST based remote function
Co-authored-by: Wills Feng <[email protected]>
1 parent 8bcba7e commit c2ca02c

File tree

12 files changed

+227
-10
lines changed

12 files changed

+227
-10
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,10 +1238,11 @@ void PrestoServer::registerRemoteFunctions() {
12381238
} else {
12391239
VELOX_FAIL(
12401240
"To register remote functions using a json file path you need to "
1241-
"specify the remote server location using '{}', '{}' or '{}'.",
1241+
"specify the remote server location using '{}', '{}' or '{}' or {}.",
12421242
SystemConfig::kRemoteFunctionServerThriftAddress,
12431243
SystemConfig::kRemoteFunctionServerThriftPort,
1244-
SystemConfig::kRemoteFunctionServerThriftUdsPath);
1244+
SystemConfig::kRemoteFunctionServerThriftUdsPath,
1245+
SystemConfig::kRemoteFunctionServerRestURL);
12451246
}
12461247
}
12471248
#endif

presto-native-execution/presto_cpp/main/common/Configs.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ std::string SystemConfig::remoteFunctionServerSerde() const {
337337
return optionalProperty(kRemoteFunctionServerSerde).value();
338338
}
339339

340+
std::string SystemConfig::remoteFunctionRestUrl() const {
341+
return optionalProperty(kRemoteFunctionServerRestURL).value();
342+
}
343+
340344
int32_t SystemConfig::maxDriversPerTask() const {
341345
return optionalProperty<int32_t>(kMaxDriversPerTask).value();
342346
}

presto-native-execution/presto_cpp/main/common/Configs.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,10 @@ class SystemConfig : public ConfigBase {
600600
static constexpr std::string_view kRemoteFunctionServerThriftUdsPath{
601601
"remote-function-server.thrift.uds-path"};
602602

603+
/// HTTP URL used by the remote function rest server.
604+
static constexpr std::string_view kRemoteFunctionServerRestURL{
605+
"remote-function-server.rest.url"};
606+
603607
/// Path where json files containing signatures for remote functions can be
604608
/// found.
605609
static constexpr std::string_view
@@ -693,6 +697,8 @@ class SystemConfig : public ConfigBase {
693697

694698
std::string remoteFunctionServerSerde() const;
695699

700+
std::string remoteFunctionRestUrl() const;
701+
696702
int32_t maxDriversPerTask() const;
697703

698704
int32_t concurrentLifespansPerTask() const;

presto-native-execution/presto_cpp/main/types/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,20 @@ add_library(
1818
presto_types OBJECT
1919
PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp
2020
PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp)
21+
2122
add_dependencies(presto_types presto_operators presto_type_converter velox_type
2223
velox_type_fbhive velox_dwio_dwrf_proto)
2324

2425
target_link_libraries(presto_types presto_type_converter velox_type_fbhive
2526
velox_hive_partition_function velox_tpch_gen)
2627

28+
if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
29+
add_dependencies(presto_types velox_expression presto_server_remote_function
30+
velox_functions_remote)
31+
target_link_libraries(presto_types presto_server_remote_function
32+
velox_functions_remote)
33+
endif()
34+
2735
set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)
2836

2937
add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,23 @@
1414

1515
#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
1616
#include <boost/algorithm/string/case_conv.hpp>
17+
#include "presto_cpp/main/common/Configs.h"
1718
#include "presto_cpp/presto_protocol/Base64Util.h"
1819
#include "velox/common/base/Exceptions.h"
1920
#include "velox/functions/prestosql/types/JsonType.h"
2021
#include "velox/vector/ComplexVector.h"
2122
#include "velox/vector/ConstantVector.h"
2223
#include "velox/vector/FlatVector.h"
24+
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
25+
#include "presto_cpp/main/JsonSignatureParser.h"
26+
#include "velox/expression/FunctionSignature.h"
27+
#include "velox/functions/remote/client/Remote.h"
28+
#endif
2329

2430
using namespace facebook::velox::core;
31+
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
32+
using facebook::velox::functions::remote::PageFormat;
33+
#endif
2534
using facebook::velox::TypeKind;
2635

2736
namespace facebook::presto {
@@ -412,6 +421,18 @@ std::optional<TypedExprPtr> VeloxExprConverter::tryConvertLike(
412421
returnType, args, getFunctionName(signature));
413422
}
414423

424+
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
425+
PageFormat fromSerdeString(const std::string_view& serdeName) {
426+
if (serdeName == "presto_page") {
427+
return PageFormat::PRESTO_PAGE;
428+
} else {
429+
VELOX_FAIL(
430+
"presto_page serde is expected by remote function server but got : '{}'",
431+
serdeName);
432+
}
433+
}
434+
#endif
435+
415436
TypedExprPtr VeloxExprConverter::toVeloxExpr(
416437
const protocol::CallExpression& pexpr) const {
417438
if (auto builtin = std::dynamic_pointer_cast<protocol::BuiltInFunctionHandle>(
@@ -458,10 +479,69 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
458479
pexpr.functionHandle)) {
459480
auto args = toVeloxExpr(pexpr.arguments);
460481
auto returnType = typeParser_->parse(pexpr.returnType);
482+
461483
return std::make_shared<CallTypedExpr>(
462484
returnType, args, getFunctionName(sqlFunctionHandle->functionId));
463485
}
486+
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
487+
else if (
488+
auto restFunctionHandle =
489+
std::dynamic_pointer_cast<protocol::RestFunctionHandle>(
490+
pexpr.functionHandle)) {
491+
492+
auto args = toVeloxExpr(pexpr.arguments);
493+
auto returnType = typeParser_->parse(pexpr.returnType);
494+
495+
const auto* systemConfig = SystemConfig::instance();
496+
497+
velox::functions::RemoteVectorFunctionMetadata metadata;
498+
metadata.serdeFormat =
499+
fromSerdeString(systemConfig->remoteFunctionServerSerde());
500+
proxygen::URL url(systemConfig->remoteFunctionRestUrl());
501+
metadata.location = url;
502+
metadata.functionId = restFunctionHandle->functionId;
503+
metadata.version = restFunctionHandle->version;
504+
505+
const auto& prestoSignature = restFunctionHandle->signature;
506+
// parseTypeSignature
507+
velox::exec::FunctionSignatureBuilder signatureBuilder;
508+
// Handle type variable constraints
509+
for (const auto& typeVar : prestoSignature.typeVariableConstraints) {
510+
signatureBuilder.typeVariable(typeVar.name);
511+
}
464512

513+
// Handle long variable constraints (for integer variables)
514+
for (const auto& longVar : prestoSignature.longVariableConstraints) {
515+
signatureBuilder.integerVariable(longVar.name);
516+
}
517+
518+
// Handle return type
519+
signatureBuilder.returnType(prestoSignature.returnType);
520+
521+
// Handle argument types
522+
for (const auto& argType : prestoSignature.argumentTypes) {
523+
signatureBuilder.argumentType(argType);
524+
}
525+
526+
// Handle variable arity
527+
if (prestoSignature.variableArity) {
528+
signatureBuilder.variableArity();
529+
}
530+
531+
auto signature = signatureBuilder.build();
532+
std::vector<velox::exec::FunctionSignaturePtr> veloxSignatures = {
533+
signature};
534+
535+
velox::functions::registerRemoteFunction(
536+
getFunctionName(restFunctionHandle->functionId),
537+
veloxSignatures,
538+
metadata,
539+
false);
540+
541+
return std::make_shared<CallTypedExpr>(
542+
returnType, args, getFunctionName(restFunctionHandle->functionId));
543+
}
544+
#endif
465545
VELOX_FAIL("Unsupported function handle: {}", pexpr.functionHandle->_type);
466546
}
467547

presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ target_link_libraries(
7272
${GFLAGS_LIBRARIES}
7373
pthread)
7474

75+
if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
76+
add_dependencies(presto_expressions_test presto_server_remote_function
77+
velox_expression velox_functions_remote)
78+
79+
target_link_libraries(
80+
presto_expressions_test GTest::gmock GTest::gmock_main
81+
presto_server_remote_function velox_expression velox_functions_remote)
82+
83+
endif()
84+
7585
set_property(TARGET presto_expressions_test PROPERTY JOB_POOL_LINK
7686
presto_link_job_pool)
7787

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ void to_json(json& j, const std::shared_ptr<FunctionHandle>& p) {
112112
j = *std::static_pointer_cast<SqlFunctionHandle>(p);
113113
return;
114114
}
115+
if (type == "rest") {
116+
j = *std::static_pointer_cast<RestFunctionHandle>(p);
117+
return;
118+
}
115119

116120
throw TypeError(type + " no abstract type FunctionHandle ");
117121
}
@@ -138,6 +142,13 @@ void from_json(const json& j, std::shared_ptr<FunctionHandle>& p) {
138142
p = std::static_pointer_cast<FunctionHandle>(k);
139143
return;
140144
}
145+
if (type == "rest") {
146+
std::shared_ptr<RestFunctionHandle> k =
147+
std::make_shared<RestFunctionHandle>();
148+
j.get_to(*k);
149+
p = std::static_pointer_cast<FunctionHandle>(k);
150+
return;
151+
}
141152

142153
throw TypeError(type + " no abstract type FunctionHandle ");
143154
}
@@ -5796,6 +5807,20 @@ void to_json(json& j, const JsonBasedUdfFunctionMetadata& p) {
57965807
"JsonBasedUdfFunctionMetadata",
57975808
"AggregationFunctionMetadata",
57985809
"aggregateMetadata");
5810+
to_json_key(
5811+
j,
5812+
"functionId",
5813+
p.functionId,
5814+
"JsonBasedUdfFunctionMetadata",
5815+
"SqlFunctionId",
5816+
"functionId");
5817+
to_json_key(
5818+
j,
5819+
"version",
5820+
p.version,
5821+
"JsonBasedUdfFunctionMetadata",
5822+
"String",
5823+
"version");
57995824
}
58005825

58015826
void from_json(const json& j, JsonBasedUdfFunctionMetadata& p) {
@@ -5848,6 +5873,20 @@ void from_json(const json& j, JsonBasedUdfFunctionMetadata& p) {
58485873
"JsonBasedUdfFunctionMetadata",
58495874
"AggregationFunctionMetadata",
58505875
"aggregateMetadata");
5876+
from_json_key(
5877+
j,
5878+
"functionId",
5879+
p.functionId,
5880+
"JsonBasedUdfFunctionMetadata",
5881+
"SqlFunctionId",
5882+
"functionId");
5883+
from_json_key(
5884+
j,
5885+
"version",
5886+
p.version,
5887+
"JsonBasedUdfFunctionMetadata",
5888+
"String",
5889+
"version");
58515890
}
58525891
} // namespace facebook::presto::protocol
58535892
/*
@@ -8089,6 +8128,52 @@ void from_json(const json& j, RemoteTransactionHandle& p) {
80898128
}
80908129
} // namespace facebook::presto::protocol
80918130
namespace facebook::presto::protocol {
8131+
RestFunctionHandle::RestFunctionHandle() noexcept {
8132+
_type = "rest";
8133+
}
8134+
8135+
void to_json(json& j, const RestFunctionHandle& p) {
8136+
j = json::object();
8137+
j["@type"] = "rest";
8138+
to_json_key(
8139+
j,
8140+
"functionId",
8141+
p.functionId,
8142+
"RestFunctionHandle",
8143+
"SqlFunctionId",
8144+
"functionId");
8145+
to_json_key(
8146+
j, "version", p.version, "RestFunctionHandle", "String", "version");
8147+
to_json_key(
8148+
j,
8149+
"signature",
8150+
p.signature,
8151+
"RestFunctionHandle",
8152+
"Signature",
8153+
"signature");
8154+
}
8155+
8156+
void from_json(const json& j, RestFunctionHandle& p) {
8157+
p._type = j["@type"];
8158+
from_json_key(
8159+
j,
8160+
"functionId",
8161+
p.functionId,
8162+
"RestFunctionHandle",
8163+
"SqlFunctionId",
8164+
"functionId");
8165+
from_json_key(
8166+
j, "version", p.version, "RestFunctionHandle", "String", "version");
8167+
from_json_key(
8168+
j,
8169+
"signature",
8170+
p.signature,
8171+
"RestFunctionHandle",
8172+
"Signature",
8173+
"signature");
8174+
}
8175+
} // namespace facebook::presto::protocol
8176+
namespace facebook::presto::protocol {
80928177
RowNumberNode::RowNumberNode() noexcept {
80938178
_type = "com.facebook.presto.sql.planner.plan.RowNumberNode";
80948179
}

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,21 @@ extern const char* const PRESTO_ABORT_TASK_URL_PARAM;
6767
class Exception : public std::runtime_error {
6868
public:
6969
explicit Exception(const std::string& message)
70-
: std::runtime_error(message){};
70+
: std::runtime_error(message) {};
7171
};
7272

7373
class TypeError : public Exception {
7474
public:
75-
explicit TypeError(const std::string& message) : Exception(message){};
75+
explicit TypeError(const std::string& message) : Exception(message) {};
7676
};
7777

7878
class OutOfRange : public Exception {
7979
public:
80-
explicit OutOfRange(const std::string& message) : Exception(message){};
80+
explicit OutOfRange(const std::string& message) : Exception(message) {};
8181
};
8282
class ParseError : public Exception {
8383
public:
84-
explicit ParseError(const std::string& message) : Exception(message){};
84+
explicit ParseError(const std::string& message) : Exception(message) {};
8585
};
8686

8787
using String = std::string;
@@ -1502,6 +1502,8 @@ struct JsonBasedUdfFunctionMetadata {
15021502
String schema = {};
15031503
RoutineCharacteristics routineCharacteristics = {};
15041504
std::shared_ptr<AggregationFunctionMetadata> aggregateMetadata = {};
1505+
std::shared_ptr<SqlFunctionId> functionId = {};
1506+
std::shared_ptr<String> version = {};
15051507
};
15061508
void to_json(json& j, const JsonBasedUdfFunctionMetadata& p);
15071509
void from_json(const json& j, JsonBasedUdfFunctionMetadata& p);
@@ -1915,6 +1917,17 @@ void to_json(json& j, const RemoteTransactionHandle& p);
19151917
void from_json(const json& j, RemoteTransactionHandle& p);
19161918
} // namespace facebook::presto::protocol
19171919
namespace facebook::presto::protocol {
1920+
struct RestFunctionHandle : public FunctionHandle {
1921+
SqlFunctionId functionId = {};
1922+
String version = {};
1923+
Signature signature = {};
1924+
1925+
RestFunctionHandle() noexcept;
1926+
};
1927+
void to_json(json& j, const RestFunctionHandle& p);
1928+
void from_json(const json& j, RestFunctionHandle& p);
1929+
} // namespace facebook::presto::protocol
1930+
namespace facebook::presto::protocol {
19181931
struct RowNumberNode : public PlanNode {
19191932
std::shared_ptr<PlanNode> source = {};
19201933
List<VariableReferenceExpression> partitionBy = {};

presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ AbstractClasses:
174174
subclasses:
175175
- { name: BuiltInFunctionHandle, key: $static }
176176
- { name: SqlFunctionHandle, key: json_file }
177-
177+
- { name: RestFunctionHandle, key: rest }
178178

179179
JavaClasses:
180180
- presto-spi/src/main/java/com/facebook/presto/spi/ErrorCause.java
@@ -190,6 +190,7 @@ JavaClasses:
190190
- presto-main/src/main/java/com/facebook/presto/execution/buffer/BufferState.java
191191
- presto-main/src/main/java/com/facebook/presto/metadata/BuiltInFunctionHandle.java
192192
- presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionHandle.java
193+
- presto-spi/src/main/java/com/facebook/presto/spi/function/RestFunctionHandle.java
193194
- presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java
194195
- presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java
195196
- presto-spi/src/main/java/com/facebook/presto/spi/relation/CallExpression.java
@@ -313,4 +314,4 @@ JavaClasses:
313314
- presto-main/src/main/java/com/facebook/presto/connector/system/SystemTableLayoutHandle.java
314315
- presto-main/src/main/java/com/facebook/presto/connector/system/SystemTransactionHandle.java
315316
- presto-spi/src/main/java/com/facebook/presto/spi/function/AggregationFunctionMetadata.java
316-
- presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/json/JsonBasedUdfFunctionMetadata.java
317+
- presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/JsonBasedUdfFunctionMetadata.java

0 commit comments

Comments
 (0)