Skip to content

[native] Add CLP connector #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ set_property(TARGET presto_server_lib PROPERTY JOB_POOL_LINK
presto_link_job_pool)

add_executable(presto_server PrestoMain.cpp)
target_link_options(presto_server PRIVATE "-no-pie")

# Moving velox_hive_connector and velox_tpch_connector to presto_server_lib
# results in multiple link errors similar to the one below only on GCC.
# "undefined reference to `vtable for velox::connector::tpch::TpchTableHandle`"
# TODO: Fix these errors.
target_link_libraries(presto_server presto_server_lib velox_hive_connector
velox_tpch_connector)
velox_tpch_connector velox_clp_connector)

# Clang requires explicit linking with libatomic.
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang"
Expand Down
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "velox/common/memory/MmapAllocator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/connectors/Connector.h"
#include "velox/connectors/clp/ClpConnector.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h"
Expand Down Expand Up @@ -271,6 +272,8 @@ void PrestoServer::run() {
std::make_unique<IcebergPrestoToVeloxConnector>("iceberg"));
registerPrestoToVeloxConnector(
std::make_unique<TpchPrestoToVeloxConnector>("tpch"));
registerPrestoToVeloxConnector(
std::make_unique<ClpPrestoToVeloxConnector>("clp"));
// Presto server uses system catalog or system schema in other catalogs
// in different places in the code. All these resolve to the SystemConnector.
// Depending on where the operator or column is used, different prefixes can
Expand Down Expand Up @@ -1183,6 +1186,11 @@ void PrestoServer::registerConnectorFactories() {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::tpch::TpchConnectorFactory>());
}
if (!velox::connector::hasConnectorFactory(
velox::connector::clp::ClpConnectorFactory::kClpConnectorName)) {
velox::connector::registerConnectorFactory(
std::make_shared<velox::connector::clp::ClpConnectorFactory>());
}
}

std::vector<std::string> PrestoServer::registerConnectors(
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ add_executable(
SessionPropertiesTest.cpp
TaskManagerTest.cpp
QueryContextManagerTest.cpp)
target_link_options(presto_server_test PRIVATE "-no-pie")

if(DEFINED PRESTO_MEMORY_CHECKER_TYPE AND PRESTO_MEMORY_CHECKER_TYPE STREQUAL
"LINUX_MEMORY_CHECKER")
Expand All @@ -44,6 +45,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_types>
velox_hive_connector
velox_tpch_connector
velox_clp_connector
velox_presto_serializer
velox_functions_prestosql
velox_aggregates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
*/

#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/presto_protocol/connector/clp/ClpConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h"

#include <velox/type/fbhive/HiveTypeParser.h>
#include "velox/connectors/clp/ClpColumnHandle.h"
#include "velox/connectors/clp/ClpConnectorSplit.h"
#include "velox/connectors/clp/ClpTableHandle.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveDataSink.h"
Expand Down Expand Up @@ -1552,4 +1556,57 @@ std::unique_ptr<protocol::ConnectorProtocol>
TpchPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::tpch::TpchConnectorProtocol>();
}

std::unique_ptr<velox::connector::ConnectorSplit>
ClpPrestoToVeloxConnector::toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const {
auto clpSplit = dynamic_cast<const protocol::clp::ClpSplit*>(connectorSplit);
VELOX_CHECK_NOT_NULL(
clpSplit, "Unexpected split type {}", connectorSplit->_type);
return std::make_unique<connector::clp::ClpConnectorSplit>(
catalogId,
clpSplit->schemaTableName.schema,
clpSplit->schemaTableName.table,
clpSplit->archivePath);
}

std::unique_ptr<velox::connector::ColumnHandle>
ClpPrestoToVeloxConnector::toVeloxColumnHandle(
const protocol::ColumnHandle* column,
const TypeParser& typeParser) const {
auto clpColumn = dynamic_cast<const protocol::clp::ClpColumnHandle*>(column);
VELOX_CHECK_NOT_NULL(
clpColumn, "Unexpected column handle type {}", column->_type);
return std::make_unique<connector::clp::ClpColumnHandle>(
clpColumn->columnName,
typeParser.parse(clpColumn->columnType),
clpColumn->nullable);
}

std::unique_ptr<velox::connector::ConnectorTableHandle>
ClpPrestoToVeloxConnector::toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
std::unordered_map<
std::string,
std::shared_ptr<velox::connector::ColumnHandle>>& assignments) const {
auto clpLayout =
std::dynamic_pointer_cast<const protocol::clp::ClpTableLayoutHandle>(
tableHandle.connectorTableLayout);
VELOX_CHECK_NOT_NULL(
clpLayout,
"Unexpected layout type {}",
tableHandle.connectorTableLayout->_type);
return std::make_unique<connector::clp::ClpTableHandle>(
tableHandle.connectorId, clpLayout->table.schemaTableName.table, clpLayout->query);
}

std::unique_ptr<protocol::ConnectorProtocol>
ClpPrestoToVeloxConnector::createConnectorProtocol() const {
return std::make_unique<protocol::clp::ClpConnectorProtocol>();
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,31 @@ class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector {
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
};

class ClpPrestoToVeloxConnector final : public PrestoToVeloxConnector {
public:
explicit ClpPrestoToVeloxConnector(std::string connectorName)
: PrestoToVeloxConnector(std::move(connectorName)) {}

std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
const protocol::ConnectorId& catalogId,
const protocol::ConnectorSplit* connectorSplit,
const protocol::SplitContext* splitContext) const final;

std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
const protocol::ColumnHandle* column,
const TypeParser& typeParser) const final;

std::unique_ptr<velox::connector::ConnectorTableHandle> toVeloxTableHandle(
const protocol::TableHandle& tableHandle,
const VeloxExprConverter& exprConverter,
const TypeParser& typeParser,
std::unordered_map<
std::string,
std::shared_ptr<velox::connector::ColumnHandle>>& assignments)
const final;

std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
const final;
};
} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ target_link_libraries(
velox_dwio_orc_reader
velox_hive_connector
velox_tpch_connector
velox_clp_connector
velox_exec
velox_dwio_common_exception
presto_type_converter
Expand Down Expand Up @@ -62,6 +63,7 @@ target_link_libraries(
velox_functions_lib
velox_hive_connector
velox_tpch_connector
velox_clp_connector
velox_hive_partition_function
velox_presto_serializer
velox_serialization
Expand Down Expand Up @@ -93,6 +95,7 @@ target_link_libraries(
velox_dwio_common
velox_hive_connector
velox_tpch_connector
velox_clp_connector
GTest::gtest
GTest::gtest_main)

Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/presto_protocol/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,23 @@ presto_protocol-cpp: presto_protocol-json
chevron -d connector/tpch/presto_protocol_tpch.json connector/tpch/presto_protocol-json-hpp.mustache >> connector/tpch/presto_protocol_tpch.h
clang-format -style=file -i connector/tpch/presto_protocol_tpch.h connector/tpch/presto_protocol_tpch.cpp

# build clp connector related structs
echo "// DO NOT EDIT : This file is generated by chevron" > connector/clp/presto_protocol_clp.cpp
chevron -d connector/clp/presto_protocol_clp.json connector/clp/presto_protocol-json-cpp.mustache >> connector/clp/presto_protocol_clp.cpp
echo "// DO NOT EDIT : This file is generated by chevron" > connector/clp/presto_protocol_clp.h
chevron -d connector/clp/presto_protocol_clp.json connector/clp/presto_protocol-json-hpp.mustache >> connector/clp/presto_protocol_clp.h
clang-format -style=file -i connector/clp/presto_protocol_clp.h connector/clp/presto_protocol_clp.cpp
Comment on lines +48 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use tab instead of spaces, otherwise the Makefile cannot run


presto_protocol-json:
./java-to-struct-json.py --config core/presto_protocol_core.yml core/special/*.java core/special/*.inc -j | jq . > core/presto_protocol_core.json
./java-to-struct-json.py --config connector/hive/presto_protocol_hive.yml connector/hive/special/*.inc -j | jq . > connector/hive/presto_protocol_hive.json
./java-to-struct-json.py --config connector/iceberg/presto_protocol_iceberg.yml connector/iceberg/special/*.inc -j | jq . > connector/iceberg/presto_protocol_iceberg.json
./java-to-struct-json.py --config connector/tpch/presto_protocol_tpch.yml connector/tpch/special/*.inc -j | jq . > connector/tpch/presto_protocol_tpch.json
./java-to-struct-json.py --config connector/clp/presto_protocol_clp.yml connector/clp/special/*.inc -j | jq . > connector/clp/presto_protocol_clp.json

presto_protocol.proto: presto_protocol-json
pystache presto_protocol-protobuf.mustache core/presto_protocol_core.json > core/presto_protocol_core.proto
pystache presto_protocol-protobuf.mustache connector/hive/presto_protocol_hive.json > connector/hive/presto_protocol_hive.proto
pystache presto_protocol-protobuf.mustache connector/iceberg/presto_protocol_iceberg.json > connector/iceberg/presto_protocol_iceberg.proto
pystache presto_protocol-protobuf.mustache connector/tpch/presto_protocol_tpch.json > connector/tpch/presto_protocol_tpch.proto
pystache presto_protocol-protobuf.mustache connector/clp/presto_protocol_clp.json > connector/clp/presto_protocol_clp.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "presto_cpp/presto_protocol/connector/clp/presto_protocol_clp.h"
#include "presto_cpp/presto_protocol/core/ConnectorProtocol.h"

namespace facebook::presto::protocol::clp {
using ClpConnectorProtocol = ConnectorProtocolTemplate<
ClpTableHandle,
ClpTableLayoutHandle,
ClpColumnHandle,
NotImplemented,
NotImplemented,
ClpSplit,
NotImplemented,
ClpTransactionHandle,
NotImplemented>;
} // namespace facebook::presto::protocol::clp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// presto_protocol.prolog.cpp
//

{{#.}}
{{#comment}}
{{comment}}
{{/comment}}
{{/.}}


#include "presto_cpp/presto_protocol/connector/clp/presto_protocol_clp.h"
using namespace std::string_literals;

namespace facebook::presto::protocol::clp {

void to_json(json& j, const ClpTransactionHandle& p) {
j = json::array();
j.push_back(p._type);
j.push_back(p.instance);
}
Comment on lines +29 to +33
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate JSON array length before indexing.
to_json(json& j, const ClpTransactionHandle& p) treats the JSON as an array and pushes back two elements. Conversely, from_json accesses [0] and [1] directly. Ensure the length is at least two before indexing to avoid out-of-range errors or unclear exception messages.


void from_json(const json& j, ClpTransactionHandle& p) {
j[0].get_to(p._type);
j[1].get_to(p.instance);
}
} // namespace facebook::presto::protocol
{{#.}}
{{#cinc}}
{{&cinc}}
{{/cinc}}
{{^cinc}}
{{#struct}}
namespace facebook::presto::protocol::clp {
{{#super_class}}
{{&class_name}}::{{&class_name}}() noexcept {
_type = "{{json_key}}";
}
{{/super_class}}

void to_json(json& j, const {{&class_name}}& p) {
j = json::object();
{{#super_class}}
j["@type"] = "{{&json_key}}";
{{/super_class}}
{{#fields}}
to_json_key(j, "{{&field_name}}", p.{{field_name}}, "{{&class_name}}", "{{&field_text}}", "{{&field_name}}");
{{/fields}}
}

void from_json(const json& j, {{&class_name}}& p) {
{{#super_class}}
p._type = j["@type"];
{{/super_class}}
{{#fields}}
from_json_key(j, "{{&field_name}}", p.{{field_name}}, "{{&class_name}}", "{{&field_text}}", "{{&field_name}}");
{{/fields}}
}
}
{{/struct}}
{{#enum}}
namespace facebook::presto::protocol::clp {
//Loosly copied this here from NLOHMANN_JSON_SERIALIZE_ENUM()

// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays
static const std::pair<{{&class_name}}, json>
{{&class_name}}_enum_table[] = { // NOLINT: cert-err58-cpp
{{#elements}}
{ {{&class_name}}::{{&element}}, "{{&element}}" }{{^_last}},{{/_last}}
{{/elements}}
};
void to_json(json& j, const {{&class_name}}& e)
{
static_assert(std::is_enum<{{&class_name}}>::value, "{{&class_name}} must be an enum!");
const auto* it = std::find_if(std::begin({{&class_name}}_enum_table), std::end({{&class_name}}_enum_table),
[e](const std::pair<{{&class_name}}, json>& ej_pair) -> bool
{
return ej_pair.first == e;
});
j = ((it != std::end({{&class_name}}_enum_table)) ? it : std::begin({{&class_name}}_enum_table))->second;
}
void from_json(const json& j, {{&class_name}}& e)
{
static_assert(std::is_enum<{{&class_name}}>::value, "{{&class_name}} must be an enum!");
const auto* it = std::find_if(std::begin({{&class_name}}_enum_table), std::end({{&class_name}}_enum_table),
[&j](const std::pair<{{&class_name}}, json>& ej_pair) -> bool
{
return ej_pair.second == j;
});
e = ((it != std::end({{&class_name}}_enum_table)) ? it : std::begin({{&class_name}}_enum_table))->first;
}
}
{{/enum}}
{{#abstract}}
namespace facebook::presto::protocol::clp {
void to_json(json& j, const std::shared_ptr<{{&class_name}}>& p) {
if ( p == nullptr ) {
return;
}
String type = p->_type;

{{#subclasses}}
if ( type == "{{&key}}" ) {
j = *std::static_pointer_cast<{{&type}}>(p);
return;
}
{{/subclasses}}

throw TypeError(type + " no abstract type {{&class_name}} {{&key}}");
}

void from_json(const json& j, std::shared_ptr<{{&class_name}}>& p) {
String type;
try {
type = p->getSubclassKey(j);
} catch (json::parse_error &e) {
throw ParseError(std::string(e.what()) + " {{&class_name}} {{&key}} {{&class_name}}");
}

{{#subclasses}}
if ( type == "{{&key}}" ) {
std::shared_ptr<{{&type}}> k = std::make_shared<{{&type}}>();
j.get_to(*k);
p = std::static_pointer_cast<{{&class_name}}>(k);
return;
}
{{/subclasses}}

throw TypeError(type + " no abstract type {{&class_name}} {{&key}}");
}
}
{{/abstract}}
{{/cinc}}
{{/.}}
Loading
Loading