Skip to content

Commit c829a7d

Browse files
authored
refactor(native): Separate IcebergPrestoToVeloxConnector to standalone file (#26237)
## Description Refactor the IcebergPrestoToVeloxConnector class by extracting it from PrestoToVeloxConnector.{h,cpp} into dedicated files. This improves code organization and modularity for the Iceberg connector implementation. Easier to maintain and extend Iceberg-specific functionality. ``` == NO RELEASE NOTE == ```
1 parent bebd5fa commit c829a7d

File tree

9 files changed

+386
-320
lines changed

9 files changed

+386
-320
lines changed

presto-native-execution/presto_cpp/main/connectors/CMakeLists.txt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,11 @@
99
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
12-
add_library(presto_connectors Registration.cpp PrestoToVeloxConnector.cpp
13-
SystemConnector.cpp)
12+
add_library(presto_connectors
13+
IcebergPrestoToVeloxConnector.cpp
14+
Registration.cpp
15+
PrestoToVeloxConnector.cpp
16+
SystemConnector.cpp)
1417

1518
if(PRESTO_ENABLE_ARROW_FLIGHT_CONNECTOR)
1619
add_subdirectory(arrow_flight)
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#include "presto_cpp/main/connectors/IcebergPrestoToVeloxConnector.h"
16+
17+
#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h"
18+
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
19+
#include "velox/type/fbhive/HiveTypeParser.h"
20+
21+
namespace facebook::presto {
22+
23+
namespace {
24+
25+
velox::connector::hive::iceberg::FileContent toVeloxFileContent(
26+
const presto::protocol::iceberg::FileContent content) {
27+
if (content == protocol::iceberg::FileContent::DATA) {
28+
return velox::connector::hive::iceberg::FileContent::kData;
29+
} else if (content == protocol::iceberg::FileContent::POSITION_DELETES) {
30+
return velox::connector::hive::iceberg::FileContent::kPositionalDeletes;
31+
}
32+
VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content));
33+
}
34+
35+
velox::dwio::common::FileFormat toVeloxFileFormat(
36+
const presto::protocol::iceberg::FileFormat format) {
37+
if (format == protocol::iceberg::FileFormat::ORC) {
38+
return velox::dwio::common::FileFormat::ORC;
39+
} else if (format == protocol::iceberg::FileFormat::PARQUET) {
40+
return velox::dwio::common::FileFormat::PARQUET;
41+
}
42+
VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format));
43+
}
44+
45+
} // namespace
46+
47+
std::unique_ptr<velox::connector::ConnectorSplit>
48+
IcebergPrestoToVeloxConnector::toVeloxSplit(
49+
const protocol::ConnectorId& catalogId,
50+
const protocol::ConnectorSplit* connectorSplit,
51+
const protocol::SplitContext* splitContext) const {
52+
auto icebergSplit =
53+
dynamic_cast<const protocol::iceberg::IcebergSplit*>(connectorSplit);
54+
VELOX_CHECK_NOT_NULL(
55+
icebergSplit, "Unexpected split type {}", connectorSplit->_type);
56+
57+
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
58+
for (const auto& entry : icebergSplit->partitionKeys) {
59+
partitionKeys.emplace(
60+
entry.second.name,
61+
entry.second.value == nullptr
62+
? std::nullopt
63+
: std::optional<std::string>{*entry.second.value});
64+
}
65+
66+
std::unordered_map<std::string, std::string> customSplitInfo;
67+
customSplitInfo["table_format"] = "hive-iceberg";
68+
69+
std::vector<velox::connector::hive::iceberg::IcebergDeleteFile> deletes;
70+
deletes.reserve(icebergSplit->deletes.size());
71+
for (const auto& deleteFile : icebergSplit->deletes) {
72+
std::unordered_map<int32_t, std::string> lowerBounds(
73+
deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end());
74+
75+
std::unordered_map<int32_t, std::string> upperBounds(
76+
deleteFile.upperBounds.begin(), deleteFile.upperBounds.end());
77+
78+
velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile(
79+
toVeloxFileContent(deleteFile.content),
80+
deleteFile.path,
81+
toVeloxFileFormat(deleteFile.format),
82+
deleteFile.recordCount,
83+
deleteFile.fileSizeInBytes,
84+
std::vector(deleteFile.equalityFieldIds),
85+
lowerBounds,
86+
upperBounds);
87+
88+
deletes.emplace_back(icebergDeleteFile);
89+
}
90+
91+
std::unordered_map<std::string, std::string> infoColumns = {
92+
{"$data_sequence_number",
93+
std::to_string(icebergSplit->dataSequenceNumber)},
94+
{"$path", icebergSplit->path}};
95+
96+
return std::make_unique<velox::connector::hive::iceberg::HiveIcebergSplit>(
97+
catalogId,
98+
icebergSplit->path,
99+
toVeloxFileFormat(icebergSplit->fileFormat),
100+
icebergSplit->start,
101+
icebergSplit->length,
102+
partitionKeys,
103+
std::nullopt,
104+
customSplitInfo,
105+
nullptr,
106+
splitContext->cacheable,
107+
deletes,
108+
infoColumns);
109+
}
110+
111+
std::unique_ptr<velox::connector::ColumnHandle>
112+
IcebergPrestoToVeloxConnector::toVeloxColumnHandle(
113+
const protocol::ColumnHandle* column,
114+
const TypeParser& typeParser) const {
115+
auto icebergColumn =
116+
dynamic_cast<const protocol::iceberg::IcebergColumnHandle*>(column);
117+
VELOX_CHECK_NOT_NULL(
118+
icebergColumn, "Unexpected column handle type {}", column->_type);
119+
// TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle'
120+
// constructor similar to how Hive Connector is handling for bucketing
121+
velox::type::fbhive::HiveTypeParser hiveTypeParser;
122+
auto type = stringToType(icebergColumn->type, typeParser);
123+
velox::connector::hive::HiveColumnHandle::ColumnParseParameters
124+
columnParseParameters;
125+
if (type->isDate()) {
126+
columnParseParameters.partitionDateValueFormat = velox::connector::hive::
127+
HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
128+
}
129+
return std::make_unique<velox::connector::hive::HiveColumnHandle>(
130+
icebergColumn->columnIdentity.name,
131+
toHiveColumnType(icebergColumn->columnType),
132+
type,
133+
type,
134+
toRequiredSubfields(icebergColumn->requiredSubfields),
135+
columnParseParameters);
136+
}
137+
138+
std::unique_ptr<velox::connector::ConnectorTableHandle>
139+
IcebergPrestoToVeloxConnector::toVeloxTableHandle(
140+
const protocol::TableHandle& tableHandle,
141+
const VeloxExprConverter& exprConverter,
142+
const TypeParser& typeParser,
143+
velox::connector::ColumnHandleMap& assignments) const {
144+
auto addSynthesizedColumn = [&](const std::string& name,
145+
protocol::hive::ColumnType columnType,
146+
const protocol::ColumnHandle& column) {
147+
if (toHiveColumnType(columnType) ==
148+
velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) {
149+
if (assignments.count(name) == 0) {
150+
assignments.emplace(name, toVeloxColumnHandle(&column, typeParser));
151+
}
152+
}
153+
};
154+
155+
auto icebergLayout = std::dynamic_pointer_cast<
156+
const protocol::iceberg::IcebergTableLayoutHandle>(
157+
tableHandle.connectorTableLayout);
158+
VELOX_CHECK_NOT_NULL(
159+
icebergLayout,
160+
"Unexpected layout type {}",
161+
tableHandle.connectorTableLayout->_type);
162+
163+
for (const auto& entry : icebergLayout->partitionColumns) {
164+
assignments.emplace(
165+
entry.columnIdentity.name, toVeloxColumnHandle(&entry, typeParser));
166+
}
167+
168+
// Add synthesized columns to the TableScanNode columnHandles as well.
169+
for (const auto& entry : icebergLayout->predicateColumns) {
170+
addSynthesizedColumn(entry.first, entry.second.columnType, entry.second);
171+
}
172+
173+
auto icebergTableHandle =
174+
std::dynamic_pointer_cast<const protocol::iceberg::IcebergTableHandle>(
175+
tableHandle.connectorHandle);
176+
VELOX_CHECK_NOT_NULL(
177+
icebergTableHandle,
178+
"Unexpected table handle type {}",
179+
tableHandle.connectorHandle->_type);
180+
181+
// Use fully qualified name if available.
182+
std::string tableName = icebergTableHandle->schemaName.empty()
183+
? icebergTableHandle->icebergTableName.tableName
184+
: fmt::format(
185+
"{}.{}",
186+
icebergTableHandle->schemaName,
187+
icebergTableHandle->icebergTableName.tableName);
188+
189+
return toHiveTableHandle(
190+
icebergLayout->domainPredicate,
191+
icebergLayout->remainingPredicate,
192+
icebergLayout->pushdownFilterEnabled,
193+
tableName,
194+
icebergLayout->dataColumns,
195+
tableHandle,
196+
{},
197+
exprConverter,
198+
typeParser);
199+
}
200+
201+
std::unique_ptr<protocol::ConnectorProtocol>
202+
IcebergPrestoToVeloxConnector::createConnectorProtocol() const {
203+
return std::make_unique<protocol::iceberg::IcebergConnectorProtocol>();
204+
}
205+
206+
} // namespace facebook::presto
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#pragma once
16+
17+
#include "presto_cpp/main/connectors/PrestoToVeloxConnector.h"
18+
19+
namespace facebook::presto {
20+
21+
class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector {
22+
public:
23+
explicit IcebergPrestoToVeloxConnector(std::string connectorName)
24+
: PrestoToVeloxConnector(std::move(connectorName)) {}
25+
26+
std::unique_ptr<velox::connector::ConnectorSplit> toVeloxSplit(
27+
const protocol::ConnectorId& catalogId,
28+
const protocol::ConnectorSplit* connectorSplit,
29+
const protocol::SplitContext* splitContext) const final;
30+
31+
std::unique_ptr<velox::connector::ColumnHandle> toVeloxColumnHandle(
32+
const protocol::ColumnHandle* column,
33+
const TypeParser& typeParser) const final;
34+
35+
std::unique_ptr<velox::connector::ConnectorTableHandle> toVeloxTableHandle(
36+
const protocol::TableHandle& tableHandle,
37+
const VeloxExprConverter& exprConverter,
38+
const TypeParser& typeParser,
39+
velox::connector::ColumnHandleMap& assignments) const final;
40+
41+
std::unique_ptr<protocol::ConnectorProtocol> createConnectorProtocol()
42+
const final;
43+
};
44+
45+
} // namespace facebook::presto

0 commit comments

Comments
 (0)