diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 9672058718344..fd539dcfbb9eb 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -36,7 +36,10 @@ #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/UnsafeRowExchangeSource.h" #include "presto_cpp/main/types/FunctionMetadata.h" +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" +#include "presto_cpp/main/types/IcebergPrestoToVeloxConnector.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" +#include "presto_cpp/main/types/TpchPrestoToVeloxConnector.h" #include "velox/common/base/Counters.h" #include "velox/common/base/StatsReporter.h" #include "velox/common/caching/CacheTTLController.h" diff --git a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp index c80ec9cdfb364..deb52fdb552b8 100644 --- a/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp +++ b/presto-native-execution/presto_cpp/main/tests/TaskManagerTest.cpp @@ -21,6 +21,7 @@ #include "presto_cpp/main/tests/HttpServerWrapper.h" #include "presto_cpp/main/tests/MultableConfigs.h" #include "presto_cpp/main/types/PrestoToVeloxConnector.h" +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" #include "velox/common/base/Fs.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/file/FileSystems.h" diff --git a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt index e37c99c432521..faaa2e9eb157c 100644 --- a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt @@ -16,8 +16,14 @@ target_link_libraries(presto_type_converter velox_type_parser) add_library( presto_types OBJECT - PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp - PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp) + PrestoToVeloxQueryPlan.cpp + PrestoToVeloxExpr.cpp + VeloxPlanValidator.cpp + PrestoToVeloxSplit.cpp + PrestoToVeloxConnector.cpp + TpchPrestoToVeloxConnector.cpp + HivePrestoToVeloxConnector.cpp + IcebergPrestoToVeloxConnector.cpp) add_dependencies(presto_types presto_operators presto_type_converter velox_type velox_type_fbhive velox_dwio_dwrf_proto) diff --git a/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.cpp new file mode 100644 index 0000000000000..a05050e062d70 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.cpp @@ -0,0 +1,1267 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" +#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h" +#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" + +#include +#include "velox/velox/connectors/hive/HiveConnector.h" +#include "velox/velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/velox/connectors/hive/HiveDataSink.h" +#include "velox/velox/connectors/hive/TableHandle.h" + +namespace facebook::presto { +using namespace velox; + +dwio::common::FileFormat toVeloxFileFormat( + const presto::protocol::hive::StorageFormat& format) { + if (format.inputFormat == "com.facebook.hive.orc.OrcInputFormat") { + return dwio::common::FileFormat::DWRF; + } else if ( + format.inputFormat == + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") { + return dwio::common::FileFormat::PARQUET; + } else if (format.inputFormat == "org.apache.hadoop.mapred.TextInputFormat") { + if (format.serDe == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + return dwio::common::FileFormat::TEXT; + } else if (format.serDe == "org.apache.hive.hcatalog.data.JsonSerDe") { + return dwio::common::FileFormat::JSON; + } + } else if ( + format.inputFormat == + "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") { + if (format.serDe == + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { + return dwio::common::FileFormat::PARQUET; + } + } else if (format.inputFormat == "com.facebook.alpha.AlphaInputFormat") { + // ALPHA has been renamed in Velox to NIMBLE. + return dwio::common::FileFormat::NIMBLE; + } + VELOX_UNSUPPORTED( + "Unsupported file format: {} {}", format.inputFormat, format.serDe); +} + +template +std::string toJsonString(const T& value) { + return ((json)value).dump(); +} + +TypePtr stringToType( + const std::string& typeString, + const TypeParser& typeParser) { + return typeParser.parse(typeString); +} + +connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( + protocol::hive::ColumnType type) { + switch (type) { + case protocol::hive::ColumnType::PARTITION_KEY: + return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; + case protocol::hive::ColumnType::REGULAR: + return connector::hive::HiveColumnHandle::ColumnType::kRegular; + case protocol::hive::ColumnType::SYNTHESIZED: + return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; + default: + VELOX_UNSUPPORTED( + "Unsupported Hive column type: {}.", toJsonString(type)); + } +} + +std::vector toRequiredSubfields( + const protocol::List& subfields) { + std::vector result; + result.reserve(subfields.size()); + for (auto& subfield : subfields) { + result.emplace_back(subfield); + } + return result; +} + +template +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + return type; +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type); + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& elementType = type->childAt(0); + return std::make_shared(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, elementType->kind(), elementType)); +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& keyType = type->childAt(0); + auto& valueType = type->childAt(1); + return std::make_shared( + VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, keyType->kind(), keyType), + VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, valueType->kind(), valueType)); +} + +template <> +TypePtr fieldNamesToLowerCase(const TypePtr& type) { + auto& rowType = type->asRow(); + std::vector names; + std::vector types; + names.reserve(type->size()); + types.reserve(type->size()); + for (int i = 0; i < rowType.size(); i++) { + std::string name = rowType.nameOf(i); + folly::toLowerAscii(name); + names.push_back(std::move(name)); + auto& childType = rowType.childAt(i); + types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, childType->kind(), childType)); + } + return std::make_shared(std::move(names), std::move(types)); +} + +int64_t toInt64( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return VariantConverter::convert(value) + .value(); +} + +int128_t toInt128( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +Timestamp toTimestamp( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + const auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +int64_t dateToInt64( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + return value.value(); +} + +template +T toFloatingPoint( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto variant = exprConverter.getConstantValue(type, *block); + return variant.value(); +} + +std::string toString( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto value = exprConverter.getConstantValue(type, *block); + if (type->isVarbinary()) { + return value.value(); + } + return value.value(); +} + +bool toBoolean( + const std::shared_ptr& block, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + auto variant = exprConverter.getConstantValue(type, *block); + return variant.value(); +} + +std::unique_ptr bigintRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded ? std::numeric_limits::min() + : toInt64(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toInt64(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr hugeintRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded ? std::numeric_limits::min() + : toInt128(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toInt128(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr timestampRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + const bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded + ? std::numeric_limits::min() + : toTimestamp(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + ++low; + } + + const bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : toTimestamp(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + --high; + } + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr boolRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + + if (!lowUnbounded && !highUnbounded) { + bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); + bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); + VELOX_CHECK_EQ( + lowValue, + highValue, + "Boolean range should not be [FALSE, TRUE] after coordinator " + "optimization."); + return std::make_unique(lowValue, nullAllowed); + } + // Presto coordinator has made optimizations to the bool range already. For + // example, [FALSE, TRUE) will be optimized and shown here as (-infinity, + // TRUE). Plus (-infinity, +infinity) case has been guarded in toFilter() + // method, here it can only be one side bounded scenarios. + VELOX_CHECK_NE( + lowUnbounded, + highUnbounded, + "Passed in boolean range can only have one side bounded range scenario"); + if (!lowUnbounded) { + VELOX_CHECK( + highUnbounded, + "Boolean range should not be double side bounded after coordinator " + "optimization."); + bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); + + // (TRUE, +infinity) case, should resolve to filter all + if (lowExclusive && lowValue) { + if (nullAllowed) { + return std::make_unique(); + } + return std::make_unique(); + } + + // Both cases (FALSE, +infinity) or [TRUE, +infinity) should evaluate to + // true. Case [FALSE, +infinity) should not be expected + VELOX_CHECK( + !(!lowExclusive && !lowValue), + "Case [FALSE, +infinity) should " + "not be expected"); + return std::make_unique(true, nullAllowed); + } + if (!highUnbounded) { + VELOX_CHECK( + lowUnbounded, + "Boolean range should not be double side bounded after coordinator " + "optimization."); + bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); + + // (-infinity, FALSE) case, should resolve to filter all + if (highExclusive && !highValue) { + if (nullAllowed) { + return std::make_unique(); + } + return std::make_unique(); + } + + // Both cases (-infinity, TRUE) or (-infinity, FALSE] should evaluate to + // false. Case (-infinity, TRUE] should not be expected + VELOX_CHECK( + !(!highExclusive && highValue), + "Case (-infinity, TRUE] should " + "not be expected"); + return std::make_unique(false, nullAllowed); + } + VELOX_UNREACHABLE(); +} + +template +std::unique_ptr floatingPointRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + auto low = lowUnbounded + ? (-1.0 * std::numeric_limits::infinity()) + : toFloatingPoint(range.low.valueBlock, exprConverter, type); + + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + auto high = highUnbounded + ? std::numeric_limits::infinity() + : toFloatingPoint(range.high.valueBlock, exprConverter, type); + + // Handle NaN cases as NaN is not supported as a limit in Velox Filters + if (!lowUnbounded && std::isnan(low)) { + if (lowExclusive) { + // x > NaN is always false as NaN is considered the largest value. + return std::make_unique(); + } + // Equivalent to x > infinity as only NaN is greater than infinity + // Presto currently converts x >= NaN into the filter with domain + // [NaN, max), so ignoring the high value is fine. + low = std::numeric_limits::infinity(); + lowExclusive = true; + high = std::numeric_limits::infinity(); + highUnbounded = true; + highExclusive = false; + } else if (!highUnbounded && std::isnan(high)) { + high = std::numeric_limits::infinity(); + if (highExclusive) { + // equivalent to x in [low , infinity] or (low , infinity] + highExclusive = false; + } else { + if (lowUnbounded) { + // Anything <= NaN is true as NaN is the largest possible value. + return std::make_unique(); + } + // Equivalent to x > low or x >=low + highUnbounded = true; + } + } + + return std::make_unique>( + low, + lowUnbounded, + lowExclusive, + high, + highUnbounded, + highExclusive, + nullAllowed); +} + +std::unique_ptr varcharRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + auto low = + lowUnbounded ? "" : toString(range.low.valueBlock, exprConverter, type); + + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + auto high = + highUnbounded ? "" : toString(range.high.valueBlock, exprConverter, type); + return std::make_unique( + low, + lowUnbounded, + lowExclusive, + high, + highUnbounded, + highExclusive, + nullAllowed); +} + +std::unique_ptr dateRangeToFilter( + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter, + const TypePtr& type) { + bool lowUnbounded = range.low.valueBlock == nullptr; + auto low = lowUnbounded + ? std::numeric_limits::min() + : dateToInt64(range.low.valueBlock, exprConverter, type); + if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { + low++; + } + + bool highUnbounded = range.high.valueBlock == nullptr; + auto high = highUnbounded + ? std::numeric_limits::max() + : dateToInt64(range.high.valueBlock, exprConverter, type); + if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { + high--; + } + + return std::make_unique(low, high, nullAllowed); +} + +std::unique_ptr combineIntegerRanges( + std::vector>& bigintFilters, + bool nullAllowed) { + bool allSingleValue = std::all_of( + bigintFilters.begin(), bigintFilters.end(), [](const auto& range) { + return range->isSingleValue(); + }); + + if (allSingleValue) { + std::vector values; + values.reserve(bigintFilters.size()); + for (const auto& filter : bigintFilters) { + values.emplace_back(filter->lower()); + } + return common::createBigintValues(values, nullAllowed); + } + + if (bigintFilters.size() == 2 && + bigintFilters[0]->lower() == std::numeric_limits::min() && + bigintFilters[1]->upper() == std::numeric_limits::max()) { + assert(bigintFilters[0]->upper() + 1 <= bigintFilters[1]->lower() - 1); + return std::make_unique( + bigintFilters[0]->upper() + 1, + bigintFilters[1]->lower() - 1, + nullAllowed); + } + + bool allNegatedValues = true; + bool foundMaximum = false; + assert(bigintFilters.size() > 1); // true by size checks on ranges + std::vector rejectedValues; + + // check if int64 min is a rejected value + if (bigintFilters[0]->lower() == std::numeric_limits::min() + 1) { + rejectedValues.emplace_back(std::numeric_limits::min()); + } + if (bigintFilters[0]->lower() > std::numeric_limits::min() + 1) { + // too many value at the lower end, bail out + return std::make_unique( + std::move(bigintFilters), nullAllowed); + } + rejectedValues.push_back(bigintFilters[0]->upper() + 1); + for (int i = 1; i < bigintFilters.size(); ++i) { + if (bigintFilters[i]->lower() != bigintFilters[i - 1]->upper() + 2) { + allNegatedValues = false; + break; + } + if (bigintFilters[i]->upper() == std::numeric_limits::max()) { + foundMaximum = true; + break; + } + rejectedValues.push_back(bigintFilters[i]->upper() + 1); + // make sure there is another range possible above this one + if (bigintFilters[i]->upper() == std::numeric_limits::max() - 1) { + foundMaximum = true; + break; + } + } + + if (allNegatedValues && foundMaximum) { + return common::createNegatedBigintValues(rejectedValues, nullAllowed); + } + + return std::make_unique( + std::move(bigintFilters), nullAllowed); +} + +std::unique_ptr combineBytesRanges( + std::vector>& bytesFilters, + bool nullAllowed) { + bool allSingleValue = std::all_of( + bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { + return range->isSingleValue(); + }); + + if (allSingleValue) { + std::vector values; + values.reserve(bytesFilters.size()); + for (const auto& filter : bytesFilters) { + values.emplace_back(filter->lower()); + } + return std::make_unique(values, nullAllowed); + } + + int lowerUnbounded = 0, upperUnbounded = 0; + bool allExclusive = std::all_of( + bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { + return range->lowerExclusive() && range->upperExclusive(); + }); + if (allExclusive) { + folly::F14FastSet unmatched; + std::vector rejectedValues; + rejectedValues.reserve(bytesFilters.size()); + for (int i = 0; i < bytesFilters.size(); ++i) { + if (bytesFilters[i]->isLowerUnbounded()) { + ++lowerUnbounded; + } else { + if (unmatched.contains(bytesFilters[i]->lower())) { + unmatched.erase(bytesFilters[i]->lower()); + rejectedValues.emplace_back(bytesFilters[i]->lower()); + } else { + unmatched.insert(bytesFilters[i]->lower()); + } + } + if (bytesFilters[i]->isUpperUnbounded()) { + ++upperUnbounded; + } else { + if (unmatched.contains(bytesFilters[i]->upper())) { + unmatched.erase(bytesFilters[i]->upper()); + rejectedValues.emplace_back(bytesFilters[i]->upper()); + } else { + unmatched.insert(bytesFilters[i]->upper()); + } + } + } + + if (lowerUnbounded == 1 && upperUnbounded == 1 && unmatched.size() == 0) { + return std::make_unique( + rejectedValues, nullAllowed); + } + } + + if (bytesFilters.size() == 2 && bytesFilters[0]->isLowerUnbounded() && + bytesFilters[1]->isUpperUnbounded()) { + // create a negated bytes range instead + return std::make_unique( + bytesFilters[0]->upper(), + false, + !bytesFilters[0]->upperExclusive(), + bytesFilters[1]->lower(), + false, + !bytesFilters[1]->lowerExclusive(), + nullAllowed); + } + + std::vector> bytesGeneric; + for (int i = 0; i < bytesFilters.size(); ++i) { + bytesGeneric.emplace_back(std::unique_ptr( + dynamic_cast(bytesFilters[i].release()))); + } + + return std::make_unique( + std::move(bytesGeneric), nullAllowed, false); +} + +std::unique_ptr toFilter( + const TypePtr& type, + const protocol::Range& range, + bool nullAllowed, + const VeloxExprConverter& exprConverter) { + if (type->isDate()) { + return dateRangeToFilter(range, nullAllowed, exprConverter, type); + } + switch (type->kind()) { + case TypeKind::TINYINT: + case TypeKind::SMALLINT: + case TypeKind::INTEGER: + case TypeKind::BIGINT: + return bigintRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::HUGEINT: + return hugeintRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::DOUBLE: + return floatingPointRangeToFilter( + range, nullAllowed, exprConverter, type); + case TypeKind::VARCHAR: + case TypeKind::VARBINARY: + return varcharRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::BOOLEAN: + return boolRangeToFilter(range, nullAllowed, exprConverter, type); + case TypeKind::REAL: + return floatingPointRangeToFilter( + range, nullAllowed, exprConverter, type); + case TypeKind::TIMESTAMP: + return timestampRangeToFilter(range, nullAllowed, exprConverter, type); + default: + VELOX_UNSUPPORTED("Unsupported range type: {}", type->toString()); + } +} + +std::unique_ptr toFilter( + const protocol::Domain& domain, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser) { + auto nullAllowed = domain.nullAllowed; + if (auto sortedRangeSet = + std::dynamic_pointer_cast(domain.values)) { + auto type = stringToType(sortedRangeSet->type, typeParser); + auto ranges = sortedRangeSet->ranges; + + if (ranges.empty()) { + VELOX_CHECK(nullAllowed, "Unexpected always-false filter"); + return std::make_unique(); + } + + if (ranges.size() == 1) { + // 'is not null' arrives as unbounded range with 'nulls not allowed'. + // We catch this case and create 'is not null' filter instead of the range + // filter. + const auto& range = ranges[0]; + bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; + bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; + bool highExclusive = range.high.bound == protocol::Bound::BELOW; + bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; + if (lowUnbounded && highUnbounded && !nullAllowed) { + return std::make_unique(); + } + + return toFilter(type, ranges[0], nullAllowed, exprConverter); + } + + if (type->isDate()) { + std::vector> dateFilters; + dateFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + dateFilters.emplace_back( + dateRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return std::make_unique( + std::move(dateFilters), nullAllowed); + } + + if (type->kind() == TypeKind::BIGINT || type->kind() == TypeKind::INTEGER || + type->kind() == TypeKind::SMALLINT || + type->kind() == TypeKind::TINYINT) { + std::vector> bigintFilters; + bigintFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + bigintFilters.emplace_back( + bigintRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return combineIntegerRanges(bigintFilters, nullAllowed); + } + + if (type->kind() == TypeKind::VARCHAR) { + std::vector> bytesFilters; + bytesFilters.reserve(ranges.size()); + for (const auto& range : ranges) { + bytesFilters.emplace_back( + varcharRangeToFilter(range, nullAllowed, exprConverter, type)); + } + return combineBytesRanges(bytesFilters, nullAllowed); + } + + if (type->kind() == TypeKind::BOOLEAN) { + VELOX_CHECK_EQ(ranges.size(), 2, "Multi bool ranges size can only be 2."); + std::unique_ptr boolFilter; + for (const auto& range : ranges) { + auto filter = + boolRangeToFilter(range, nullAllowed, exprConverter, type); + if (filter->kind() == common::FilterKind::kAlwaysFalse or + filter->kind() == common::FilterKind::kIsNull) { + continue; + } + VELOX_CHECK_NULL(boolFilter); + boolFilter = std::move(filter); + } + + VELOX_CHECK_NOT_NULL(boolFilter); + return boolFilter; + } + + std::vector> filters; + filters.reserve(ranges.size()); + for (const auto& range : ranges) { + filters.emplace_back(toFilter(type, range, nullAllowed, exprConverter)); + } + + return std::make_unique( + std::move(filters), nullAllowed, false); + } else if ( + auto equatableValueSet = + std::dynamic_pointer_cast( + domain.values)) { + if (equatableValueSet->entries.empty()) { + if (nullAllowed) { + return std::make_unique(); + } else { + return std::make_unique(); + } + } + VELOX_UNSUPPORTED( + "EquatableValueSet (with non-empty entries) to Velox filter conversion is not supported yet."); + } else if ( + auto allOrNoneValueSet = + std::dynamic_pointer_cast( + domain.values)) { + VELOX_UNSUPPORTED( + "AllOrNoneValueSet to Velox filter conversion is not supported yet."); + } + VELOX_UNSUPPORTED("Unsupported filter found."); +} + +std::unique_ptr toHiveTableHandle( + const protocol::TupleDomain& domainPredicate, + const std::shared_ptr& remainingPredicate, + bool isPushdownFilterEnabled, + const std::string& tableName, + const protocol::List& dataColumns, + const protocol::TableHandle& tableHandle, + const protocol::Map& tableParameters, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser) { + connector::hive::SubfieldFilters subfieldFilters; + auto domains = domainPredicate.domains; + for (const auto& domain : *domains) { + auto filter = domain.second; + subfieldFilters[common::Subfield(domain.first)] = + toFilter(domain.second, exprConverter, typeParser); + } + + auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate); + if (auto constant = std::dynamic_pointer_cast( + remainingFilter)) { + bool value = constant->value().value(); + VELOX_CHECK(value, "Unexpected always-false remaining predicate"); + + // Use null for always-true filter. + remainingFilter = nullptr; + } + + RowTypePtr finalDataColumns; + if (!dataColumns.empty()) { + std::vector names; + std::vector types; + velox::type::fbhive::HiveTypeParser hiveTypeParser; + names.reserve(dataColumns.size()); + types.reserve(dataColumns.size()); + for (auto& column : dataColumns) { + std::string name = column.name; + folly::toLowerAscii(name); + names.emplace_back(std::move(name)); + auto parsedType = hiveTypeParser.parse(column.type); + // The type from the metastore may have upper case letters + // in field names, convert them all to lower case to be + // compatible with Presto. + types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( + fieldNamesToLowerCase, parsedType->kind(), parsedType)); + } + finalDataColumns = ROW(std::move(names), std::move(types)); + } + + if (tableParameters.empty()) { + return std::make_unique( + tableHandle.connectorId, + tableName, + isPushdownFilterEnabled, + std::move(subfieldFilters), + remainingFilter, + finalDataColumns); + } + + std::unordered_map finalTableParameters = {}; + finalTableParameters.reserve(tableParameters.size()); + for (const auto& [key, value] : tableParameters) { + finalTableParameters[key] = value; + } + + return std::make_unique( + tableHandle.connectorId, + tableName, + isPushdownFilterEnabled, + std::move(subfieldFilters), + remainingFilter, + finalDataColumns, + finalTableParameters); +} + +connector::hive::LocationHandle::TableType toTableType( + protocol::hive::TableType tableType) { + switch (tableType) { + case protocol::hive::TableType::NEW: + // Temporary tables are written and read by the SPI in a single pipeline. + // So they can be treated as New. They do not require Append or Overwrite + // semantics as applicable for regular tables. + case protocol::hive::TableType::TEMPORARY: + return connector::hive::LocationHandle::TableType::kNew; + case protocol::hive::TableType::EXISTING: + return connector::hive::LocationHandle::TableType::kExisting; + default: + VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); + } +} + +std::shared_ptr toLocationHandle( + const protocol::hive::LocationHandle& locationHandle) { + return std::make_shared( + locationHandle.targetPath, + locationHandle.writePath, + toTableType(locationHandle.tableType)); +} + +dwio::common::FileFormat toFileFormat( + const protocol::hive::HiveStorageFormat storageFormat, + const char* usage) { + switch (storageFormat) { + case protocol::hive::HiveStorageFormat::DWRF: + return dwio::common::FileFormat::DWRF; + case protocol::hive::HiveStorageFormat::PARQUET: + return dwio::common::FileFormat::PARQUET; + case protocol::hive::HiveStorageFormat::ALPHA: + // This has been renamed in Velox from ALPHA to NIMBLE. + return dwio::common::FileFormat::NIMBLE; + default: + VELOX_UNSUPPORTED( + "Unsupported file format in {}: {}.", + usage, + toJsonString(storageFormat)); + } +} + +velox::common::CompressionKind toFileCompressionKind( + const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) { + switch (hiveCompressionCodec) { + case protocol::hive::HiveCompressionCodec::SNAPPY: + return velox::common::CompressionKind::CompressionKind_SNAPPY; + case protocol::hive::HiveCompressionCodec::GZIP: + return velox::common::CompressionKind::CompressionKind_GZIP; + case protocol::hive::HiveCompressionCodec::LZ4: + return velox::common::CompressionKind::CompressionKind_LZ4; + case protocol::hive::HiveCompressionCodec::ZSTD: + return velox::common::CompressionKind::CompressionKind_ZSTD; + case protocol::hive::HiveCompressionCodec::NONE: + return velox::common::CompressionKind::CompressionKind_NONE; + default: + VELOX_UNSUPPORTED( + "Unsupported file compression format: {}.", + toJsonString(hiveCompressionCodec)); + } +} + +velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind( + protocol::hive::BucketFunctionType bucketFuncType) { + switch (bucketFuncType) { + case protocol::hive::BucketFunctionType::PRESTO_NATIVE: + return velox::connector::hive::HiveBucketProperty::Kind::kPrestoNative; + case protocol::hive::BucketFunctionType::HIVE_COMPATIBLE: + return velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible; + default: + VELOX_USER_FAIL( + "Unknown hive bucket function: {}", toJsonString(bucketFuncType)); + } +} + +std::vector stringToTypes( + const std::shared_ptr>& typeStrings, + const TypeParser& typeParser) { + std::vector types; + types.reserve(typeStrings->size()); + for (const auto& typeString : *typeStrings) { + types.push_back(stringToType(typeString, typeParser)); + } + return types; +} + +core::SortOrder toSortOrder(protocol::hive::Order order) { + switch (order) { + case protocol::hive::Order::ASCENDING: + return core::SortOrder(true, true); + case protocol::hive::Order::DESCENDING: + return core::SortOrder(false, false); + default: + VELOX_USER_FAIL("Unknown sort order: {}", toJsonString(order)); + } +} + +std::shared_ptr toHiveSortingColumn( + const protocol::hive::SortingColumn& sortingColumn) { + return std::make_shared( + sortingColumn.columnName, toSortOrder(sortingColumn.order)); +} + +std::vector> +toHiveSortingColumns( + const protocol::List& sortedBy) { + std::vector> + sortingColumns; + sortingColumns.reserve(sortedBy.size()); + for (const auto& sortingColumn : sortedBy) { + sortingColumns.push_back(toHiveSortingColumn(sortingColumn)); + } + return sortingColumns; +} + +std::shared_ptr +toHiveBucketProperty( + const std::vector>& + inputColumns, + const std::shared_ptr& bucketProperty, + const TypeParser& typeParser) { + if (bucketProperty == nullptr) { + return nullptr; + } + + VELOX_USER_CHECK_GT( + bucketProperty->bucketCount, 0, "Bucket count must be a positive value"); + + VELOX_USER_CHECK( + !bucketProperty->bucketedBy.empty(), + "Bucketed columns must be set: {}", + toJsonString(*bucketProperty)); + + const velox::connector::hive::HiveBucketProperty::Kind kind = + toHiveBucketPropertyKind(bucketProperty->bucketFunctionType); + std::vector bucketedTypes; + if (kind == + velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible) { + VELOX_USER_CHECK_NULL( + bucketProperty->types, + "Unexpected bucketed types set for hive compatible bucket function: {}", + toJsonString(*bucketProperty)); + bucketedTypes.reserve(bucketProperty->bucketedBy.size()); + for (const auto& bucketedColumn : bucketProperty->bucketedBy) { + TypePtr bucketedType{nullptr}; + for (const auto& inputColumn : inputColumns) { + if (inputColumn->name() != bucketedColumn) { + continue; + } + VELOX_USER_CHECK_NOT_NULL(inputColumn->hiveType()); + bucketedType = inputColumn->hiveType(); + break; + } + VELOX_USER_CHECK_NOT_NULL( + bucketedType, "Bucketed column {} not found", bucketedColumn); + bucketedTypes.push_back(std::move(bucketedType)); + } + } else { + VELOX_USER_CHECK_EQ( + bucketProperty->types->size(), + bucketProperty->bucketedBy.size(), + "Bucketed types is not set properly for presto native bucket function: {}", + toJsonString(*bucketProperty)); + bucketedTypes = stringToTypes(bucketProperty->types, typeParser); + } + + const auto sortedBy = toHiveSortingColumns(bucketProperty->sortedBy); + + return std::make_shared( + toHiveBucketPropertyKind(bucketProperty->bucketFunctionType), + bucketProperty->bucketCount, + bucketProperty->bucketedBy, + bucketedTypes, + sortedBy); +} + +std::unique_ptr +toVeloxHiveColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) { + auto* hiveColumn = + dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + hiveColumn, "Unexpected column handle type {}", column->_type); + velox::type::fbhive::HiveTypeParser hiveTypeParser; + // TODO(spershin): Should we pass something different than 'typeSignature' + // to 'hiveType' argument of the 'HiveColumnHandle' constructor? + return std::make_unique( + hiveColumn->name, + toHiveColumnType(hiveColumn->columnType), + stringToType(hiveColumn->typeSignature, typeParser), + hiveTypeParser.parse(hiveColumn->hiveType), + toRequiredSubfields(hiveColumn->requiredSubfields)); +} + +velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( + const protocol::hive::BucketConversion& bucketConversion) { + velox::connector::hive::HiveBucketConversion veloxBucketConversion; + // Current table bucket count (new). + veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount; + // Partition bucket count (old). + veloxBucketConversion.partitionBucketCount = + bucketConversion.partitionBucketCount; + TypeParser typeParser; + for (const auto& column : bucketConversion.bucketColumnHandles) { + // Columns used as bucket input. + veloxBucketConversion.bucketColumnHandles.push_back( + toVeloxHiveColumnHandle(&column, typeParser)); + } + return veloxBucketConversion; +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* const connectorSplit) const { + auto hiveSplit = + dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + hiveSplit, "Unexpected split type {}", connectorSplit->_type); + std::unordered_map> partitionKeys; + for (const auto& entry : hiveSplit->partitionKeys) { + partitionKeys.emplace( + entry.name, + entry.value == nullptr ? std::nullopt + : std::optional{*entry.value}); + } + std::unordered_map customSplitInfo; + for (const auto& [key, value] : hiveSplit->fileSplit.customSplitInfo) { + customSplitInfo[key] = value; + } + std::shared_ptr extraFileInfo; + if (hiveSplit->fileSplit.extraFileInfo) { + extraFileInfo = std::make_shared( + velox::encoding::Base64::decode(*hiveSplit->fileSplit.extraFileInfo)); + } + std::unordered_map serdeParameters; + serdeParameters.reserve(hiveSplit->storage.serdeParameters.size()); + for (const auto& [key, value] : hiveSplit->storage.serdeParameters) { + serdeParameters[key] = value; + } + std::unordered_map infoColumns = { + {"$path", hiveSplit->fileSplit.path}, + {"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)}, + {"$file_modified_time", + std::to_string(hiveSplit->fileSplit.fileModifiedTime)}, + }; + if (hiveSplit->tableBucketNumber) { + infoColumns["$bucket"] = std::to_string(*hiveSplit->tableBucketNumber); + } + auto veloxSplit = + std::make_unique( + catalogId, + hiveSplit->fileSplit.path, + toVeloxFileFormat(hiveSplit->storage.storageFormat), + hiveSplit->fileSplit.start, + hiveSplit->fileSplit.length, + partitionKeys, + hiveSplit->tableBucketNumber + ? std::optional(*hiveSplit->tableBucketNumber) + : std::nullopt, + customSplitInfo, + extraFileInfo, + serdeParameters, + hiveSplit->splitWeight, + infoColumns); + if (hiveSplit->bucketConversion) { + VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); + veloxSplit->bucketConversion = + toVeloxBucketConversion(*hiveSplit->bucketConversion); + } + return veloxSplit; +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + return toVeloxHiveColumnHandle(column, typeParser); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) const { + auto addSynthesizedColumn = [&](const std::string& name, + protocol::hive::ColumnType columnType, + const protocol::ColumnHandle& column) { + if (toHiveColumnType(columnType) == + velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { + if (assignments.count(name) == 0) { + assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); + } + } + }; + auto hiveLayout = + std::dynamic_pointer_cast( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + hiveLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + for (const auto& entry : hiveLayout->partitionColumns) { + assignments.emplace(entry.name, toVeloxColumnHandle(&entry, typeParser)); + } + + // Add synthesized columns to the TableScanNode columnHandles as well. + for (const auto& entry : hiveLayout->predicateColumns) { + addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); + } + + auto hiveTableHandle = + std::dynamic_pointer_cast( + tableHandle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveTableHandle, + "Unexpected table handle type {}", + tableHandle.connectorHandle->_type); + + // Use fully qualified name if available. + std::string tableName = hiveTableHandle->schemaName.empty() + ? hiveTableHandle->tableName + : fmt::format( + "{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName); + + return toHiveTableHandle( + hiveLayout->domainPredicate, + hiveLayout->remainingPredicate, + hiveLayout->pushdownFilterEnabled, + tableName, + hiveLayout->dataColumns, + tableHandle, + hiveLayout->tableParameters, + exprConverter, + typeParser); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const { + auto hiveOutputTableHandle = + std::dynamic_pointer_cast( + createHandle->handle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveOutputTableHandle, + "Unexpected output table handle type {}", + createHandle->handle.connectorHandle->_type); + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + hiveOutputTableHandle->inputColumns, typeParser, isPartitioned); + return std::make_unique( + inputColumns, + toLocationHandle(hiveOutputTableHandle->locationHandle), + toFileFormat(hiveOutputTableHandle->tableStorageFormat, "TableWrite"), + toHiveBucketProperty( + inputColumns, hiveOutputTableHandle->bucketProperty, typeParser), + std::optional( + toFileCompressionKind(hiveOutputTableHandle->compressionCodec))); +} + +std::unique_ptr +HivePrestoToVeloxConnector::toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const { + auto hiveInsertTableHandle = + std::dynamic_pointer_cast( + insertHandle->handle.connectorHandle); + VELOX_CHECK_NOT_NULL( + hiveInsertTableHandle, + "Unexpected insert table handle type {}", + insertHandle->handle.connectorHandle->_type); + bool isPartitioned{false}; + const auto inputColumns = toHiveColumns( + hiveInsertTableHandle->inputColumns, typeParser, isPartitioned); + + const auto table = hiveInsertTableHandle->pageSinkMetadata.table; + VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); + return std::make_unique( + inputColumns, + toLocationHandle(hiveInsertTableHandle->locationHandle), + toFileFormat(hiveInsertTableHandle->tableStorageFormat, "TableWrite"), + toHiveBucketProperty( + inputColumns, hiveInsertTableHandle->bucketProperty, typeParser), + std::optional( + toFileCompressionKind(hiveInsertTableHandle->compressionCodec)), + std::unordered_map( + table->storage.serdeParameters.begin(), + table->storage.serdeParameters.end())); +} + +std::vector> +HivePrestoToVeloxConnector::toHiveColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const { + hasPartitionColumn = false; + std::vector> + hiveColumns; + hiveColumns.reserve(inputColumns.size()); + for (const auto& columnHandle : inputColumns) { + hasPartitionColumn |= + columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY; + hiveColumns.emplace_back( + std::dynamic_pointer_cast( + std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); + } + return hiveColumns; +} + +std::unique_ptr +HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( + const protocol::ConnectorPartitioningHandle* partitioningHandle, + const std::vector& bucketToPartition, + const std::vector& channels, + const std::vector& constValues, + bool& effectivelyGather) const { + auto hivePartitioningHandle = + dynamic_cast( + partitioningHandle); + VELOX_CHECK_NOT_NULL( + hivePartitioningHandle, + "Unexpected partitioning handle type {}", + partitioningHandle->_type); + VELOX_USER_CHECK( + hivePartitioningHandle->bucketFunctionType == + protocol::hive::BucketFunctionType::HIVE_COMPATIBLE, + "Unsupported Hive bucket function type: {}", + toJsonString(hivePartitioningHandle->bucketFunctionType)); + effectivelyGather = hivePartitioningHandle->bucketCount == 1; + return std::make_unique( + hivePartitioningHandle->bucketCount, + bucketToPartition, + channels, + constValues); +} + +std::unique_ptr +HivePrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.h new file mode 100644 index 0000000000000..6945df707fc1f --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/HivePrestoToVeloxConnector.h @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "presto_cpp/main/types/PrestoToVeloxConnector.h" +#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" + +namespace facebook::presto { +using namespace velox; +template +std::string toJsonString(const T& value); +connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( + protocol::hive::ColumnType type); +std::unique_ptr toHiveTableHandle( + const protocol::TupleDomain& domainPredicate, + const std::shared_ptr& remainingPredicate, + bool isPushdownFilterEnabled, + const std::string& tableName, + const protocol::List& dataColumns, + const protocol::TableHandle& tableHandle, + const protocol::Map& tableParameters, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser); +TypePtr stringToType( + const std::string& typeString, + const TypeParser& typeParser); +std::vector toRequiredSubfields( + const protocol::List& subfields); +class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit HivePrestoToVeloxConnector(std::string connectorName) + : PrestoToVeloxConnector(std::move(connectorName)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) + const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::CreateHandle* createHandle, + const TypeParser& typeParser) const final; + + std::unique_ptr + toVeloxInsertTableHandle( + const protocol::InsertHandle* insertHandle, + const TypeParser& typeParser) const final; + + std::unique_ptr + createVeloxPartitionFunctionSpec( + const protocol::ConnectorPartitioningHandle* partitioningHandle, + const std::vector& bucketToPartition, + const std::vector& channels, + const std::vector& constValues, + bool& effectivelyGather) const final; + + std::unique_ptr createConnectorProtocol() + const final; + + private: + std::vector> + toHiveColumns( + const protocol::List& inputColumns, + const TypeParser& typeParser, + bool& hasPartitionColumn) const; +}; +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.cpp new file mode 100644 index 0000000000000..b74c579c529df --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.cpp @@ -0,0 +1,195 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "presto_cpp/main/types/IcebergPrestoToVeloxConnector.h" +#include +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" +#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" +#include "velox/velox/connectors/hive/iceberg/IcebergDeleteFile.h" +#include "velox/velox/connectors/hive/iceberg/IcebergSplit.h" +namespace facebook::presto { + +namespace { +using namespace velox; +velox::connector::hive::iceberg::FileContent toVeloxFileContent( + const presto::protocol::iceberg::FileContent content) { + if (content == protocol::iceberg::FileContent::DATA) { + return velox::connector::hive::iceberg::FileContent::kData; + } else if (content == protocol::iceberg::FileContent::POSITION_DELETES) { + return velox::connector::hive::iceberg::FileContent::kPositionalDeletes; + } + VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content)); +} +dwio::common::FileFormat toVeloxFileFormat( + const presto::protocol::iceberg::FileFormat format) { + if (format == protocol::iceberg::FileFormat::ORC) { + return dwio::common::FileFormat::ORC; + } else if (format == protocol::iceberg::FileFormat::PARQUET) { + return dwio::common::FileFormat::PARQUET; + } + VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format)); +} +} // namespace + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* const connectorSplit) const { + auto icebergSplit = + dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + icebergSplit, "Unexpected split type {}", connectorSplit->_type); + + std::unordered_map> partitionKeys; + for (const auto& entry : icebergSplit->partitionKeys) { + partitionKeys.emplace( + entry.second.name, + entry.second.value == nullptr + ? std::nullopt + : std::optional{*entry.second.value}); + } + + std::unordered_map customSplitInfo; + customSplitInfo["table_format"] = "hive-iceberg"; + + std::vector deletes; + deletes.reserve(icebergSplit->deletes.size()); + for (const auto& deleteFile : icebergSplit->deletes) { + std::unordered_map lowerBounds( + deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end()); + + std::unordered_map upperBounds( + deleteFile.upperBounds.begin(), deleteFile.upperBounds.end()); + + velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile( + toVeloxFileContent(deleteFile.content), + deleteFile.path, + toVeloxFileFormat(deleteFile.format), + deleteFile.recordCount, + deleteFile.fileSizeInBytes, + std::vector(deleteFile.equalityFieldIds), + lowerBounds, + upperBounds); + + deletes.emplace_back(icebergDeleteFile); + } + + std::unordered_map infoColumns = { + {"$data_sequence_number", + std::to_string(icebergSplit->dataSequenceNumber)}, + {"$path", icebergSplit->path}}; + + return std::make_unique( + catalogId, + icebergSplit->path, + toVeloxFileFormat(icebergSplit->fileFormat), + icebergSplit->start, + icebergSplit->length, + partitionKeys, + std::nullopt, + customSplitInfo, + nullptr, + deletes, + infoColumns); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + auto icebergColumn = + dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + icebergColumn, "Unexpected column handle type {}", column->_type); + // TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle' + // constructor similar to how Hive Connector is handling for bucketing + velox::type::fbhive::HiveTypeParser hiveTypeParser; + return std::make_unique( + icebergColumn->columnIdentity.name, + toHiveColumnType(icebergColumn->columnType), + stringToType(icebergColumn->type, typeParser), + stringToType(icebergColumn->type, typeParser), + toRequiredSubfields(icebergColumn->requiredSubfields)); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) const { + auto addSynthesizedColumn = [&](const std::string& name, + protocol::hive::ColumnType columnType, + const protocol::ColumnHandle& column) { + if (toHiveColumnType(columnType) == + velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { + if (assignments.count(name) == 0) { + assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); + } + } + }; + + auto icebergLayout = std::dynamic_pointer_cast< + const protocol::iceberg::IcebergTableLayoutHandle>( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + icebergLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + + for (const auto& entry : icebergLayout->partitionColumns) { + assignments.emplace( + entry.columnIdentity.name, toVeloxColumnHandle(&entry, typeParser)); + } + + // Add synthesized columns to the TableScanNode columnHandles as well. + for (const auto& entry : icebergLayout->predicateColumns) { + addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); + } + + auto icebergTableHandle = + std::dynamic_pointer_cast( + tableHandle.connectorHandle); + VELOX_CHECK_NOT_NULL( + icebergTableHandle, + "Unexpected table handle type {}", + tableHandle.connectorHandle->_type); + + // Use fully qualified name if available. + std::string tableName = icebergTableHandle->schemaName.empty() + ? icebergTableHandle->icebergTableName.tableName + : fmt::format( + "{}.{}", + icebergTableHandle->schemaName, + icebergTableHandle->icebergTableName.tableName); + + return toHiveTableHandle( + icebergLayout->domainPredicate, + icebergLayout->remainingPredicate, + icebergLayout->pushdownFilterEnabled, + tableName, + icebergLayout->dataColumns, + tableHandle, + {}, + exprConverter, + typeParser); +} + +std::unique_ptr +IcebergPrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.h new file mode 100644 index 0000000000000..954606aed4b35 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/IcebergPrestoToVeloxConnector.h @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "presto_cpp/main/types/PrestoToVeloxConnector.h" +namespace facebook::presto { +class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit IcebergPrestoToVeloxConnector(std::string connectorName) + : PrestoToVeloxConnector(std::move(connectorName)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) + const final; + + std::unique_ptr createConnectorProtocol() + const final; +}; +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index 3fa89ab21539e..ac118bbc6ee9e 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -13,20 +13,6 @@ */ #include "presto_cpp/main/types/PrestoToVeloxConnector.h" -#include "presto_cpp/presto_protocol/connector/hive/HiveConnectorProtocol.h" -#include "presto_cpp/presto_protocol/connector/iceberg/IcebergConnectorProtocol.h" -#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h" - -#include -#include "velox/connectors/hive/HiveConnector.h" -#include "velox/connectors/hive/HiveConnectorSplit.h" -#include "velox/connectors/hive/HiveDataSink.h" -#include "velox/connectors/hive/TableHandle.h" -#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h" -#include "velox/connectors/hive/iceberg/IcebergSplit.h" -#include "velox/connectors/tpch/TpchConnector.h" -#include "velox/connectors/tpch/TpchConnectorSplit.h" - namespace facebook::presto { namespace { @@ -63,1469 +49,4 @@ const PrestoToVeloxConnector& getPrestoToVeloxConnector( it != connectors().end(), "Connector {} not registered", connectorName); return *(it->second); } - -namespace { -using namespace velox; - -dwio::common::FileFormat toVeloxFileFormat( - const presto::protocol::hive::StorageFormat& format) { - if (format.inputFormat == "com.facebook.hive.orc.OrcInputFormat") { - return dwio::common::FileFormat::DWRF; - } else if ( - format.inputFormat == - "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat") { - return dwio::common::FileFormat::PARQUET; - } else if (format.inputFormat == "org.apache.hadoop.mapred.TextInputFormat") { - if (format.serDe == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - return dwio::common::FileFormat::TEXT; - } else if (format.serDe == "org.apache.hive.hcatalog.data.JsonSerDe") { - return dwio::common::FileFormat::JSON; - } - } else if ( - format.inputFormat == - "org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat") { - if (format.serDe == - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") { - return dwio::common::FileFormat::PARQUET; - } - } else if (format.inputFormat == "com.facebook.alpha.AlphaInputFormat") { - // ALPHA has been renamed in Velox to NIMBLE. - return dwio::common::FileFormat::NIMBLE; - } - VELOX_UNSUPPORTED( - "Unsupported file format: {} {}", format.inputFormat, format.serDe); -} - -dwio::common::FileFormat toVeloxFileFormat( - const presto::protocol::iceberg::FileFormat format) { - if (format == protocol::iceberg::FileFormat::ORC) { - return dwio::common::FileFormat::ORC; - } else if (format == protocol::iceberg::FileFormat::PARQUET) { - return dwio::common::FileFormat::PARQUET; - } - VELOX_UNSUPPORTED("Unsupported file format: {}", fmt::underlying(format)); -} - -template -std::string toJsonString(const T& value) { - return ((json)value).dump(); -} - -TypePtr stringToType( - const std::string& typeString, - const TypeParser& typeParser) { - return typeParser.parse(typeString); -} - -connector::hive::HiveColumnHandle::ColumnType toHiveColumnType( - protocol::hive::ColumnType type) { - switch (type) { - case protocol::hive::ColumnType::PARTITION_KEY: - return connector::hive::HiveColumnHandle::ColumnType::kPartitionKey; - case protocol::hive::ColumnType::REGULAR: - return connector::hive::HiveColumnHandle::ColumnType::kRegular; - case protocol::hive::ColumnType::SYNTHESIZED: - return connector::hive::HiveColumnHandle::ColumnType::kSynthesized; - default: - VELOX_UNSUPPORTED( - "Unsupported Hive column type: {}.", toJsonString(type)); - } -} - -std::vector toRequiredSubfields( - const protocol::List& subfields) { - std::vector result; - result.reserve(subfields.size()); - for (auto& subfield : subfields) { - result.emplace_back(subfield); - } - return result; -} - -template -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - return type; -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type); - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& elementType = type->childAt(0); - return std::make_shared(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, elementType->kind(), elementType)); -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& keyType = type->childAt(0); - auto& valueType = type->childAt(1); - return std::make_shared( - VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, keyType->kind(), keyType), - VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, valueType->kind(), valueType)); -} - -template <> -TypePtr fieldNamesToLowerCase(const TypePtr& type) { - auto& rowType = type->asRow(); - std::vector names; - std::vector types; - names.reserve(type->size()); - types.reserve(type->size()); - for (int i = 0; i < rowType.size(); i++) { - std::string name = rowType.nameOf(i); - folly::toLowerAscii(name); - names.push_back(std::move(name)); - auto& childType = rowType.childAt(i); - types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, childType->kind(), childType)); - } - return std::make_shared(std::move(names), std::move(types)); -} - -int64_t toInt64( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return VariantConverter::convert(value) - .value(); -} - -int128_t toInt128( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -Timestamp toTimestamp( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - const auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -int64_t dateToInt64( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - return value.value(); -} - -template -T toFloatingPoint( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto variant = exprConverter.getConstantValue(type, *block); - return variant.value(); -} - -std::string toString( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto value = exprConverter.getConstantValue(type, *block); - if (type->isVarbinary()) { - return value.value(); - } - return value.value(); -} - -bool toBoolean( - const std::shared_ptr& block, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - auto variant = exprConverter.getConstantValue(type, *block); - return variant.value(); -} - -std::unique_ptr bigintRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded ? std::numeric_limits::min() - : toInt64(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toInt64(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr hugeintRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded ? std::numeric_limits::min() - : toInt128(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toInt128(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr timestampRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - const bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded - ? std::numeric_limits::min() - : toTimestamp(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - ++low; - } - - const bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : toTimestamp(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - --high; - } - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr boolRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - - if (!lowUnbounded && !highUnbounded) { - bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); - bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); - VELOX_CHECK_EQ( - lowValue, - highValue, - "Boolean range should not be [FALSE, TRUE] after coordinator " - "optimization."); - return std::make_unique(lowValue, nullAllowed); - } - // Presto coordinator has made optimizations to the bool range already. For - // example, [FALSE, TRUE) will be optimized and shown here as (-infinity, - // TRUE). Plus (-infinity, +infinity) case has been guarded in toFilter() - // method, here it can only be one side bounded scenarios. - VELOX_CHECK_NE( - lowUnbounded, - highUnbounded, - "Passed in boolean range can only have one side bounded range scenario"); - if (!lowUnbounded) { - VELOX_CHECK( - highUnbounded, - "Boolean range should not be double side bounded after coordinator " - "optimization."); - bool lowValue = toBoolean(range.low.valueBlock, exprConverter, type); - - // (TRUE, +infinity) case, should resolve to filter all - if (lowExclusive && lowValue) { - if (nullAllowed) { - return std::make_unique(); - } - return std::make_unique(); - } - - // Both cases (FALSE, +infinity) or [TRUE, +infinity) should evaluate to - // true. Case [FALSE, +infinity) should not be expected - VELOX_CHECK( - !(!lowExclusive && !lowValue), - "Case [FALSE, +infinity) should " - "not be expected"); - return std::make_unique(true, nullAllowed); - } - if (!highUnbounded) { - VELOX_CHECK( - lowUnbounded, - "Boolean range should not be double side bounded after coordinator " - "optimization."); - bool highValue = toBoolean(range.high.valueBlock, exprConverter, type); - - // (-infinity, FALSE) case, should resolve to filter all - if (highExclusive && !highValue) { - if (nullAllowed) { - return std::make_unique(); - } - return std::make_unique(); - } - - // Both cases (-infinity, TRUE) or (-infinity, FALSE] should evaluate to - // false. Case (-infinity, TRUE] should not be expected - VELOX_CHECK( - !(!highExclusive && highValue), - "Case (-infinity, TRUE] should " - "not be expected"); - return std::make_unique(false, nullAllowed); - } - VELOX_UNREACHABLE(); -} - -template -std::unique_ptr floatingPointRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - auto low = lowUnbounded - ? (-1.0 * std::numeric_limits::infinity()) - : toFloatingPoint(range.low.valueBlock, exprConverter, type); - - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - auto high = highUnbounded - ? std::numeric_limits::infinity() - : toFloatingPoint(range.high.valueBlock, exprConverter, type); - - // Handle NaN cases as NaN is not supported as a limit in Velox Filters - if (!lowUnbounded && std::isnan(low)) { - if (lowExclusive) { - // x > NaN is always false as NaN is considered the largest value. - return std::make_unique(); - } - // Equivalent to x > infinity as only NaN is greater than infinity - // Presto currently converts x >= NaN into the filter with domain - // [NaN, max), so ignoring the high value is fine. - low = std::numeric_limits::infinity(); - lowExclusive = true; - high = std::numeric_limits::infinity(); - highUnbounded = true; - highExclusive = false; - } else if (!highUnbounded && std::isnan(high)) { - high = std::numeric_limits::infinity(); - if (highExclusive) { - // equivalent to x in [low , infinity] or (low , infinity] - highExclusive = false; - } else { - if (lowUnbounded) { - // Anything <= NaN is true as NaN is the largest possible value. - return std::make_unique(); - } - // Equivalent to x > low or x >=low - highUnbounded = true; - } - } - - return std::make_unique>( - low, - lowUnbounded, - lowExclusive, - high, - highUnbounded, - highExclusive, - nullAllowed); -} - -std::unique_ptr varcharRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - auto low = - lowUnbounded ? "" : toString(range.low.valueBlock, exprConverter, type); - - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - auto high = - highUnbounded ? "" : toString(range.high.valueBlock, exprConverter, type); - return std::make_unique( - low, - lowUnbounded, - lowExclusive, - high, - highUnbounded, - highExclusive, - nullAllowed); -} - -std::unique_ptr dateRangeToFilter( - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter, - const TypePtr& type) { - bool lowUnbounded = range.low.valueBlock == nullptr; - auto low = lowUnbounded - ? std::numeric_limits::min() - : dateToInt64(range.low.valueBlock, exprConverter, type); - if (!lowUnbounded && range.low.bound == protocol::Bound::ABOVE) { - low++; - } - - bool highUnbounded = range.high.valueBlock == nullptr; - auto high = highUnbounded - ? std::numeric_limits::max() - : dateToInt64(range.high.valueBlock, exprConverter, type); - if (!highUnbounded && range.high.bound == protocol::Bound::BELOW) { - high--; - } - - return std::make_unique(low, high, nullAllowed); -} - -std::unique_ptr combineIntegerRanges( - std::vector>& bigintFilters, - bool nullAllowed) { - bool allSingleValue = std::all_of( - bigintFilters.begin(), bigintFilters.end(), [](const auto& range) { - return range->isSingleValue(); - }); - - if (allSingleValue) { - std::vector values; - values.reserve(bigintFilters.size()); - for (const auto& filter : bigintFilters) { - values.emplace_back(filter->lower()); - } - return common::createBigintValues(values, nullAllowed); - } - - if (bigintFilters.size() == 2 && - bigintFilters[0]->lower() == std::numeric_limits::min() && - bigintFilters[1]->upper() == std::numeric_limits::max()) { - assert(bigintFilters[0]->upper() + 1 <= bigintFilters[1]->lower() - 1); - return std::make_unique( - bigintFilters[0]->upper() + 1, - bigintFilters[1]->lower() - 1, - nullAllowed); - } - - bool allNegatedValues = true; - bool foundMaximum = false; - assert(bigintFilters.size() > 1); // true by size checks on ranges - std::vector rejectedValues; - - // check if int64 min is a rejected value - if (bigintFilters[0]->lower() == std::numeric_limits::min() + 1) { - rejectedValues.emplace_back(std::numeric_limits::min()); - } - if (bigintFilters[0]->lower() > std::numeric_limits::min() + 1) { - // too many value at the lower end, bail out - return std::make_unique( - std::move(bigintFilters), nullAllowed); - } - rejectedValues.push_back(bigintFilters[0]->upper() + 1); - for (int i = 1; i < bigintFilters.size(); ++i) { - if (bigintFilters[i]->lower() != bigintFilters[i - 1]->upper() + 2) { - allNegatedValues = false; - break; - } - if (bigintFilters[i]->upper() == std::numeric_limits::max()) { - foundMaximum = true; - break; - } - rejectedValues.push_back(bigintFilters[i]->upper() + 1); - // make sure there is another range possible above this one - if (bigintFilters[i]->upper() == std::numeric_limits::max() - 1) { - foundMaximum = true; - break; - } - } - - if (allNegatedValues && foundMaximum) { - return common::createNegatedBigintValues(rejectedValues, nullAllowed); - } - - return std::make_unique( - std::move(bigintFilters), nullAllowed); -} - -std::unique_ptr combineBytesRanges( - std::vector>& bytesFilters, - bool nullAllowed) { - bool allSingleValue = std::all_of( - bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { - return range->isSingleValue(); - }); - - if (allSingleValue) { - std::vector values; - values.reserve(bytesFilters.size()); - for (const auto& filter : bytesFilters) { - values.emplace_back(filter->lower()); - } - return std::make_unique(values, nullAllowed); - } - - int lowerUnbounded = 0, upperUnbounded = 0; - bool allExclusive = std::all_of( - bytesFilters.begin(), bytesFilters.end(), [](const auto& range) { - return range->lowerExclusive() && range->upperExclusive(); - }); - if (allExclusive) { - folly::F14FastSet unmatched; - std::vector rejectedValues; - rejectedValues.reserve(bytesFilters.size()); - for (int i = 0; i < bytesFilters.size(); ++i) { - if (bytesFilters[i]->isLowerUnbounded()) { - ++lowerUnbounded; - } else { - if (unmatched.contains(bytesFilters[i]->lower())) { - unmatched.erase(bytesFilters[i]->lower()); - rejectedValues.emplace_back(bytesFilters[i]->lower()); - } else { - unmatched.insert(bytesFilters[i]->lower()); - } - } - if (bytesFilters[i]->isUpperUnbounded()) { - ++upperUnbounded; - } else { - if (unmatched.contains(bytesFilters[i]->upper())) { - unmatched.erase(bytesFilters[i]->upper()); - rejectedValues.emplace_back(bytesFilters[i]->upper()); - } else { - unmatched.insert(bytesFilters[i]->upper()); - } - } - } - - if (lowerUnbounded == 1 && upperUnbounded == 1 && unmatched.size() == 0) { - return std::make_unique( - rejectedValues, nullAllowed); - } - } - - if (bytesFilters.size() == 2 && bytesFilters[0]->isLowerUnbounded() && - bytesFilters[1]->isUpperUnbounded()) { - // create a negated bytes range instead - return std::make_unique( - bytesFilters[0]->upper(), - false, - !bytesFilters[0]->upperExclusive(), - bytesFilters[1]->lower(), - false, - !bytesFilters[1]->lowerExclusive(), - nullAllowed); - } - - std::vector> bytesGeneric; - for (int i = 0; i < bytesFilters.size(); ++i) { - bytesGeneric.emplace_back(std::unique_ptr( - dynamic_cast(bytesFilters[i].release()))); - } - - return std::make_unique( - std::move(bytesGeneric), nullAllowed, false); -} - -std::unique_ptr toFilter( - const TypePtr& type, - const protocol::Range& range, - bool nullAllowed, - const VeloxExprConverter& exprConverter) { - if (type->isDate()) { - return dateRangeToFilter(range, nullAllowed, exprConverter, type); - } - switch (type->kind()) { - case TypeKind::TINYINT: - case TypeKind::SMALLINT: - case TypeKind::INTEGER: - case TypeKind::BIGINT: - return bigintRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::HUGEINT: - return hugeintRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::DOUBLE: - return floatingPointRangeToFilter( - range, nullAllowed, exprConverter, type); - case TypeKind::VARCHAR: - case TypeKind::VARBINARY: - return varcharRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::BOOLEAN: - return boolRangeToFilter(range, nullAllowed, exprConverter, type); - case TypeKind::REAL: - return floatingPointRangeToFilter( - range, nullAllowed, exprConverter, type); - case TypeKind::TIMESTAMP: - return timestampRangeToFilter(range, nullAllowed, exprConverter, type); - default: - VELOX_UNSUPPORTED("Unsupported range type: {}", type->toString()); - } -} - -std::unique_ptr toFilter( - const protocol::Domain& domain, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser) { - auto nullAllowed = domain.nullAllowed; - if (auto sortedRangeSet = - std::dynamic_pointer_cast(domain.values)) { - auto type = stringToType(sortedRangeSet->type, typeParser); - auto ranges = sortedRangeSet->ranges; - - if (ranges.empty()) { - VELOX_CHECK(nullAllowed, "Unexpected always-false filter"); - return std::make_unique(); - } - - if (ranges.size() == 1) { - // 'is not null' arrives as unbounded range with 'nulls not allowed'. - // We catch this case and create 'is not null' filter instead of the range - // filter. - const auto& range = ranges[0]; - bool lowExclusive = range.low.bound == protocol::Bound::ABOVE; - bool lowUnbounded = range.low.valueBlock == nullptr && lowExclusive; - bool highExclusive = range.high.bound == protocol::Bound::BELOW; - bool highUnbounded = range.high.valueBlock == nullptr && highExclusive; - if (lowUnbounded && highUnbounded && !nullAllowed) { - return std::make_unique(); - } - - return toFilter(type, ranges[0], nullAllowed, exprConverter); - } - - if (type->isDate()) { - std::vector> dateFilters; - dateFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - dateFilters.emplace_back( - dateRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return std::make_unique( - std::move(dateFilters), nullAllowed); - } - - if (type->kind() == TypeKind::BIGINT || type->kind() == TypeKind::INTEGER || - type->kind() == TypeKind::SMALLINT || - type->kind() == TypeKind::TINYINT) { - std::vector> bigintFilters; - bigintFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - bigintFilters.emplace_back( - bigintRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return combineIntegerRanges(bigintFilters, nullAllowed); - } - - if (type->kind() == TypeKind::VARCHAR) { - std::vector> bytesFilters; - bytesFilters.reserve(ranges.size()); - for (const auto& range : ranges) { - bytesFilters.emplace_back( - varcharRangeToFilter(range, nullAllowed, exprConverter, type)); - } - return combineBytesRanges(bytesFilters, nullAllowed); - } - - if (type->kind() == TypeKind::BOOLEAN) { - VELOX_CHECK_EQ(ranges.size(), 2, "Multi bool ranges size can only be 2."); - std::unique_ptr boolFilter; - for (const auto& range : ranges) { - auto filter = - boolRangeToFilter(range, nullAllowed, exprConverter, type); - if (filter->kind() == common::FilterKind::kAlwaysFalse or - filter->kind() == common::FilterKind::kIsNull) { - continue; - } - VELOX_CHECK_NULL(boolFilter); - boolFilter = std::move(filter); - } - - VELOX_CHECK_NOT_NULL(boolFilter); - return boolFilter; - } - - std::vector> filters; - filters.reserve(ranges.size()); - for (const auto& range : ranges) { - filters.emplace_back(toFilter(type, range, nullAllowed, exprConverter)); - } - - return std::make_unique( - std::move(filters), nullAllowed, false); - } else if ( - auto equatableValueSet = - std::dynamic_pointer_cast( - domain.values)) { - if (equatableValueSet->entries.empty()) { - if (nullAllowed) { - return std::make_unique(); - } else { - return std::make_unique(); - } - } - VELOX_UNSUPPORTED( - "EquatableValueSet (with non-empty entries) to Velox filter conversion is not supported yet."); - } else if ( - auto allOrNoneValueSet = - std::dynamic_pointer_cast( - domain.values)) { - VELOX_UNSUPPORTED( - "AllOrNoneValueSet to Velox filter conversion is not supported yet."); - } - VELOX_UNSUPPORTED("Unsupported filter found."); -} - -std::unique_ptr toHiveTableHandle( - const protocol::TupleDomain& domainPredicate, - const std::shared_ptr& remainingPredicate, - bool isPushdownFilterEnabled, - const std::string& tableName, - const protocol::List& dataColumns, - const protocol::TableHandle& tableHandle, - const protocol::Map& tableParameters, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser) { - connector::hive::SubfieldFilters subfieldFilters; - auto domains = domainPredicate.domains; - for (const auto& domain : *domains) { - auto filter = domain.second; - subfieldFilters[common::Subfield(domain.first)] = - toFilter(domain.second, exprConverter, typeParser); - } - - auto remainingFilter = exprConverter.toVeloxExpr(remainingPredicate); - if (auto constant = std::dynamic_pointer_cast( - remainingFilter)) { - bool value = constant->value().value(); - VELOX_CHECK(value, "Unexpected always-false remaining predicate"); - - // Use null for always-true filter. - remainingFilter = nullptr; - } - - RowTypePtr finalDataColumns; - if (!dataColumns.empty()) { - std::vector names; - std::vector types; - velox::type::fbhive::HiveTypeParser hiveTypeParser; - names.reserve(dataColumns.size()); - types.reserve(dataColumns.size()); - for (auto& column : dataColumns) { - std::string name = column.name; - folly::toLowerAscii(name); - names.emplace_back(std::move(name)); - auto parsedType = hiveTypeParser.parse(column.type); - // The type from the metastore may have upper case letters - // in field names, convert them all to lower case to be - // compatible with Presto. - types.push_back(VELOX_DYNAMIC_TYPE_DISPATCH( - fieldNamesToLowerCase, parsedType->kind(), parsedType)); - } - finalDataColumns = ROW(std::move(names), std::move(types)); - } - - if (tableParameters.empty()) { - return std::make_unique( - tableHandle.connectorId, - tableName, - isPushdownFilterEnabled, - std::move(subfieldFilters), - remainingFilter, - finalDataColumns); - } - - std::unordered_map finalTableParameters = {}; - finalTableParameters.reserve(tableParameters.size()); - for (const auto& [key, value] : tableParameters) { - finalTableParameters[key] = value; - } - - return std::make_unique( - tableHandle.connectorId, - tableName, - isPushdownFilterEnabled, - std::move(subfieldFilters), - remainingFilter, - finalDataColumns, - finalTableParameters); -} - -connector::hive::LocationHandle::TableType toTableType( - protocol::hive::TableType tableType) { - switch (tableType) { - case protocol::hive::TableType::NEW: - // Temporary tables are written and read by the SPI in a single pipeline. - // So they can be treated as New. They do not require Append or Overwrite - // semantics as applicable for regular tables. - case protocol::hive::TableType::TEMPORARY: - return connector::hive::LocationHandle::TableType::kNew; - case protocol::hive::TableType::EXISTING: - return connector::hive::LocationHandle::TableType::kExisting; - default: - VELOX_UNSUPPORTED("Unsupported table type: {}.", toJsonString(tableType)); - } -} - -std::shared_ptr toLocationHandle( - const protocol::hive::LocationHandle& locationHandle) { - return std::make_shared( - locationHandle.targetPath, - locationHandle.writePath, - toTableType(locationHandle.tableType)); -} - -dwio::common::FileFormat toFileFormat( - const protocol::hive::HiveStorageFormat storageFormat, - const char* usage) { - switch (storageFormat) { - case protocol::hive::HiveStorageFormat::DWRF: - return dwio::common::FileFormat::DWRF; - case protocol::hive::HiveStorageFormat::PARQUET: - return dwio::common::FileFormat::PARQUET; - case protocol::hive::HiveStorageFormat::ALPHA: - // This has been renamed in Velox from ALPHA to NIMBLE. - return dwio::common::FileFormat::NIMBLE; - default: - VELOX_UNSUPPORTED( - "Unsupported file format in {}: {}.", - usage, - toJsonString(storageFormat)); - } -} - -velox::common::CompressionKind toFileCompressionKind( - const protocol::hive::HiveCompressionCodec& hiveCompressionCodec) { - switch (hiveCompressionCodec) { - case protocol::hive::HiveCompressionCodec::SNAPPY: - return velox::common::CompressionKind::CompressionKind_SNAPPY; - case protocol::hive::HiveCompressionCodec::GZIP: - return velox::common::CompressionKind::CompressionKind_GZIP; - case protocol::hive::HiveCompressionCodec::LZ4: - return velox::common::CompressionKind::CompressionKind_LZ4; - case protocol::hive::HiveCompressionCodec::ZSTD: - return velox::common::CompressionKind::CompressionKind_ZSTD; - case protocol::hive::HiveCompressionCodec::NONE: - return velox::common::CompressionKind::CompressionKind_NONE; - default: - VELOX_UNSUPPORTED( - "Unsupported file compression format: {}.", - toJsonString(hiveCompressionCodec)); - } -} - -velox::connector::hive::HiveBucketProperty::Kind toHiveBucketPropertyKind( - protocol::hive::BucketFunctionType bucketFuncType) { - switch (bucketFuncType) { - case protocol::hive::BucketFunctionType::PRESTO_NATIVE: - return velox::connector::hive::HiveBucketProperty::Kind::kPrestoNative; - case protocol::hive::BucketFunctionType::HIVE_COMPATIBLE: - return velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible; - default: - VELOX_USER_FAIL( - "Unknown hive bucket function: {}", toJsonString(bucketFuncType)); - } -} - -std::vector stringToTypes( - const std::shared_ptr>& typeStrings, - const TypeParser& typeParser) { - std::vector types; - types.reserve(typeStrings->size()); - for (const auto& typeString : *typeStrings) { - types.push_back(stringToType(typeString, typeParser)); - } - return types; -} - -core::SortOrder toSortOrder(protocol::hive::Order order) { - switch (order) { - case protocol::hive::Order::ASCENDING: - return core::SortOrder(true, true); - case protocol::hive::Order::DESCENDING: - return core::SortOrder(false, false); - default: - VELOX_USER_FAIL("Unknown sort order: {}", toJsonString(order)); - } -} - -std::shared_ptr toHiveSortingColumn( - const protocol::hive::SortingColumn& sortingColumn) { - return std::make_shared( - sortingColumn.columnName, toSortOrder(sortingColumn.order)); -} - -std::vector> -toHiveSortingColumns( - const protocol::List& sortedBy) { - std::vector> - sortingColumns; - sortingColumns.reserve(sortedBy.size()); - for (const auto& sortingColumn : sortedBy) { - sortingColumns.push_back(toHiveSortingColumn(sortingColumn)); - } - return sortingColumns; -} - -std::shared_ptr -toHiveBucketProperty( - const std::vector>& - inputColumns, - const std::shared_ptr& bucketProperty, - const TypeParser& typeParser) { - if (bucketProperty == nullptr) { - return nullptr; - } - - VELOX_USER_CHECK_GT( - bucketProperty->bucketCount, 0, "Bucket count must be a positive value"); - - VELOX_USER_CHECK( - !bucketProperty->bucketedBy.empty(), - "Bucketed columns must be set: {}", - toJsonString(*bucketProperty)); - - const velox::connector::hive::HiveBucketProperty::Kind kind = - toHiveBucketPropertyKind(bucketProperty->bucketFunctionType); - std::vector bucketedTypes; - if (kind == - velox::connector::hive::HiveBucketProperty::Kind::kHiveCompatible) { - VELOX_USER_CHECK_NULL( - bucketProperty->types, - "Unexpected bucketed types set for hive compatible bucket function: {}", - toJsonString(*bucketProperty)); - bucketedTypes.reserve(bucketProperty->bucketedBy.size()); - for (const auto& bucketedColumn : bucketProperty->bucketedBy) { - TypePtr bucketedType{nullptr}; - for (const auto& inputColumn : inputColumns) { - if (inputColumn->name() != bucketedColumn) { - continue; - } - VELOX_USER_CHECK_NOT_NULL(inputColumn->hiveType()); - bucketedType = inputColumn->hiveType(); - break; - } - VELOX_USER_CHECK_NOT_NULL( - bucketedType, "Bucketed column {} not found", bucketedColumn); - bucketedTypes.push_back(std::move(bucketedType)); - } - } else { - VELOX_USER_CHECK_EQ( - bucketProperty->types->size(), - bucketProperty->bucketedBy.size(), - "Bucketed types is not set properly for presto native bucket function: {}", - toJsonString(*bucketProperty)); - bucketedTypes = stringToTypes(bucketProperty->types, typeParser); - } - - const auto sortedBy = toHiveSortingColumns(bucketProperty->sortedBy); - - return std::make_shared( - toHiveBucketPropertyKind(bucketProperty->bucketFunctionType), - bucketProperty->bucketCount, - bucketProperty->bucketedBy, - bucketedTypes, - sortedBy); -} - -std::unique_ptr -toVeloxHiveColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) { - auto* hiveColumn = - dynamic_cast(column); - VELOX_CHECK_NOT_NULL( - hiveColumn, "Unexpected column handle type {}", column->_type); - velox::type::fbhive::HiveTypeParser hiveTypeParser; - // TODO(spershin): Should we pass something different than 'typeSignature' - // to 'hiveType' argument of the 'HiveColumnHandle' constructor? - return std::make_unique( - hiveColumn->name, - toHiveColumnType(hiveColumn->columnType), - stringToType(hiveColumn->typeSignature, typeParser), - hiveTypeParser.parse(hiveColumn->hiveType), - toRequiredSubfields(hiveColumn->requiredSubfields)); -} - -velox::connector::hive::HiveBucketConversion toVeloxBucketConversion( - const protocol::hive::BucketConversion& bucketConversion) { - velox::connector::hive::HiveBucketConversion veloxBucketConversion; - // Current table bucket count (new). - veloxBucketConversion.tableBucketCount = bucketConversion.tableBucketCount; - // Partition bucket count (old). - veloxBucketConversion.partitionBucketCount = - bucketConversion.partitionBucketCount; - TypeParser typeParser; - for (const auto& column : bucketConversion.bucketColumnHandles) { - // Columns used as bucket input. - veloxBucketConversion.bucketColumnHandles.push_back( - toVeloxHiveColumnHandle(&column, typeParser)); - } - return veloxBucketConversion; -} - -velox::connector::hive::iceberg::FileContent toVeloxFileContent( - const presto::protocol::iceberg::FileContent content) { - if (content == protocol::iceberg::FileContent::DATA) { - return velox::connector::hive::iceberg::FileContent::kData; - } else if (content == protocol::iceberg::FileContent::POSITION_DELETES) { - return velox::connector::hive::iceberg::FileContent::kPositionalDeletes; - } - VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content)); -} - -} // namespace - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { - auto hiveSplit = - dynamic_cast(connectorSplit); - VELOX_CHECK_NOT_NULL( - hiveSplit, "Unexpected split type {}", connectorSplit->_type); - std::unordered_map> partitionKeys; - for (const auto& entry : hiveSplit->partitionKeys) { - partitionKeys.emplace( - entry.name, - entry.value == nullptr ? std::nullopt - : std::optional{*entry.value}); - } - std::unordered_map customSplitInfo; - for (const auto& [key, value] : hiveSplit->fileSplit.customSplitInfo) { - customSplitInfo[key] = value; - } - std::shared_ptr extraFileInfo; - if (hiveSplit->fileSplit.extraFileInfo) { - extraFileInfo = std::make_shared( - velox::encoding::Base64::decode(*hiveSplit->fileSplit.extraFileInfo)); - } - std::unordered_map serdeParameters; - serdeParameters.reserve(hiveSplit->storage.serdeParameters.size()); - for (const auto& [key, value] : hiveSplit->storage.serdeParameters) { - serdeParameters[key] = value; - } - std::unordered_map infoColumns = { - {"$path", hiveSplit->fileSplit.path}, - {"$file_size", std::to_string(hiveSplit->fileSplit.fileSize)}, - {"$file_modified_time", - std::to_string(hiveSplit->fileSplit.fileModifiedTime)}, - }; - if (hiveSplit->tableBucketNumber) { - infoColumns["$bucket"] = std::to_string(*hiveSplit->tableBucketNumber); - } - auto veloxSplit = - std::make_unique( - catalogId, - hiveSplit->fileSplit.path, - toVeloxFileFormat(hiveSplit->storage.storageFormat), - hiveSplit->fileSplit.start, - hiveSplit->fileSplit.length, - partitionKeys, - hiveSplit->tableBucketNumber - ? std::optional(*hiveSplit->tableBucketNumber) - : std::nullopt, - customSplitInfo, - extraFileInfo, - serdeParameters, - hiveSplit->splitWeight, - infoColumns); - if (hiveSplit->bucketConversion) { - VELOX_CHECK_NOT_NULL(hiveSplit->tableBucketNumber); - veloxSplit->bucketConversion = - toVeloxBucketConversion(*hiveSplit->bucketConversion); - } - return veloxSplit; -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const { - return toVeloxHiveColumnHandle(column, typeParser); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) const { - auto addSynthesizedColumn = [&](const std::string& name, - protocol::hive::ColumnType columnType, - const protocol::ColumnHandle& column) { - if (toHiveColumnType(columnType) == - velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { - if (assignments.count(name) == 0) { - assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); - } - } - }; - auto hiveLayout = - std::dynamic_pointer_cast( - tableHandle.connectorTableLayout); - VELOX_CHECK_NOT_NULL( - hiveLayout, - "Unexpected layout type {}", - tableHandle.connectorTableLayout->_type); - for (const auto& entry : hiveLayout->partitionColumns) { - assignments.emplace(entry.name, toVeloxColumnHandle(&entry, typeParser)); - } - - // Add synthesized columns to the TableScanNode columnHandles as well. - for (const auto& entry : hiveLayout->predicateColumns) { - addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); - } - - auto hiveTableHandle = - std::dynamic_pointer_cast( - tableHandle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveTableHandle, - "Unexpected table handle type {}", - tableHandle.connectorHandle->_type); - - // Use fully qualified name if available. - std::string tableName = hiveTableHandle->schemaName.empty() - ? hiveTableHandle->tableName - : fmt::format( - "{}.{}", hiveTableHandle->schemaName, hiveTableHandle->tableName); - - return toHiveTableHandle( - hiveLayout->domainPredicate, - hiveLayout->remainingPredicate, - hiveLayout->pushdownFilterEnabled, - tableName, - hiveLayout->dataColumns, - tableHandle, - hiveLayout->tableParameters, - exprConverter, - typeParser); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxInsertTableHandle( - const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const { - auto hiveOutputTableHandle = - std::dynamic_pointer_cast( - createHandle->handle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveOutputTableHandle, - "Unexpected output table handle type {}", - createHandle->handle.connectorHandle->_type); - bool isPartitioned{false}; - const auto inputColumns = toHiveColumns( - hiveOutputTableHandle->inputColumns, typeParser, isPartitioned); - return std::make_unique( - inputColumns, - toLocationHandle(hiveOutputTableHandle->locationHandle), - toFileFormat(hiveOutputTableHandle->tableStorageFormat, "TableWrite"), - toHiveBucketProperty( - inputColumns, hiveOutputTableHandle->bucketProperty, typeParser), - std::optional( - toFileCompressionKind(hiveOutputTableHandle->compressionCodec))); -} - -std::unique_ptr -HivePrestoToVeloxConnector::toVeloxInsertTableHandle( - const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const { - auto hiveInsertTableHandle = - std::dynamic_pointer_cast( - insertHandle->handle.connectorHandle); - VELOX_CHECK_NOT_NULL( - hiveInsertTableHandle, - "Unexpected insert table handle type {}", - insertHandle->handle.connectorHandle->_type); - bool isPartitioned{false}; - const auto inputColumns = toHiveColumns( - hiveInsertTableHandle->inputColumns, typeParser, isPartitioned); - - const auto table = hiveInsertTableHandle->pageSinkMetadata.table; - VELOX_USER_CHECK_NOT_NULL(table, "Table must not be null for insert query"); - return std::make_unique( - inputColumns, - toLocationHandle(hiveInsertTableHandle->locationHandle), - toFileFormat(hiveInsertTableHandle->tableStorageFormat, "TableWrite"), - toHiveBucketProperty( - inputColumns, hiveInsertTableHandle->bucketProperty, typeParser), - std::optional( - toFileCompressionKind(hiveInsertTableHandle->compressionCodec)), - std::unordered_map( - table->storage.serdeParameters.begin(), - table->storage.serdeParameters.end())); -} - -std::vector> -HivePrestoToVeloxConnector::toHiveColumns( - const protocol::List& inputColumns, - const TypeParser& typeParser, - bool& hasPartitionColumn) const { - hasPartitionColumn = false; - std::vector> - hiveColumns; - hiveColumns.reserve(inputColumns.size()); - for (const auto& columnHandle : inputColumns) { - hasPartitionColumn |= - columnHandle.columnType == protocol::hive::ColumnType::PARTITION_KEY; - hiveColumns.emplace_back( - std::dynamic_pointer_cast( - std::shared_ptr(toVeloxColumnHandle(&columnHandle, typeParser)))); - } - return hiveColumns; -} - -std::unique_ptr -HivePrestoToVeloxConnector::createVeloxPartitionFunctionSpec( - const protocol::ConnectorPartitioningHandle* partitioningHandle, - const std::vector& bucketToPartition, - const std::vector& channels, - const std::vector& constValues, - bool& effectivelyGather) const { - auto hivePartitioningHandle = - dynamic_cast( - partitioningHandle); - VELOX_CHECK_NOT_NULL( - hivePartitioningHandle, - "Unexpected partitioning handle type {}", - partitioningHandle->_type); - VELOX_USER_CHECK( - hivePartitioningHandle->bucketFunctionType == - protocol::hive::BucketFunctionType::HIVE_COMPATIBLE, - "Unsupported Hive bucket function type: {}", - toJsonString(hivePartitioningHandle->bucketFunctionType)); - effectivelyGather = hivePartitioningHandle->bucketCount == 1; - return std::make_unique( - hivePartitioningHandle->bucketCount, - bucketToPartition, - channels, - constValues); -} - -std::unique_ptr -HivePrestoToVeloxConnector::createConnectorProtocol() const { - return std::make_unique(); -} - -std::unique_ptr -IcebergPrestoToVeloxConnector::toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { - auto icebergSplit = - dynamic_cast(connectorSplit); - VELOX_CHECK_NOT_NULL( - icebergSplit, "Unexpected split type {}", connectorSplit->_type); - - std::unordered_map> partitionKeys; - for (const auto& entry : icebergSplit->partitionKeys) { - partitionKeys.emplace( - entry.second.name, - entry.second.value == nullptr - ? std::nullopt - : std::optional{*entry.second.value}); - } - - std::unordered_map customSplitInfo; - customSplitInfo["table_format"] = "hive-iceberg"; - - std::vector deletes; - deletes.reserve(icebergSplit->deletes.size()); - for (const auto& deleteFile : icebergSplit->deletes) { - std::unordered_map lowerBounds( - deleteFile.lowerBounds.begin(), deleteFile.lowerBounds.end()); - - std::unordered_map upperBounds( - deleteFile.upperBounds.begin(), deleteFile.upperBounds.end()); - - velox::connector::hive::iceberg::IcebergDeleteFile icebergDeleteFile( - toVeloxFileContent(deleteFile.content), - deleteFile.path, - toVeloxFileFormat(deleteFile.format), - deleteFile.recordCount, - deleteFile.fileSizeInBytes, - std::vector(deleteFile.equalityFieldIds), - lowerBounds, - upperBounds); - - deletes.emplace_back(icebergDeleteFile); - } - - std::unordered_map infoColumns = { - {"$data_sequence_number", - std::to_string(icebergSplit->dataSequenceNumber)}, - {"$path", icebergSplit->path}}; - - return std::make_unique( - catalogId, - icebergSplit->path, - toVeloxFileFormat(icebergSplit->fileFormat), - icebergSplit->start, - icebergSplit->length, - partitionKeys, - std::nullopt, - customSplitInfo, - nullptr, - deletes, - infoColumns); -} - -std::unique_ptr -IcebergPrestoToVeloxConnector::toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const { - auto icebergColumn = - dynamic_cast(column); - VELOX_CHECK_NOT_NULL( - icebergColumn, "Unexpected column handle type {}", column->_type); - // TODO(imjalpreet): Modify 'hiveType' argument of the 'HiveColumnHandle' - // constructor similar to how Hive Connector is handling for bucketing - velox::type::fbhive::HiveTypeParser hiveTypeParser; - return std::make_unique( - icebergColumn->columnIdentity.name, - toHiveColumnType(icebergColumn->columnType), - stringToType(icebergColumn->type, typeParser), - stringToType(icebergColumn->type, typeParser), - toRequiredSubfields(icebergColumn->requiredSubfields)); -} - -std::unique_ptr -IcebergPrestoToVeloxConnector::toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) const { - auto addSynthesizedColumn = [&](const std::string& name, - protocol::hive::ColumnType columnType, - const protocol::ColumnHandle& column) { - if (toHiveColumnType(columnType) == - velox::connector::hive::HiveColumnHandle::ColumnType::kSynthesized) { - if (assignments.count(name) == 0) { - assignments.emplace(name, toVeloxColumnHandle(&column, typeParser)); - } - } - }; - - auto icebergLayout = std::dynamic_pointer_cast< - const protocol::iceberg::IcebergTableLayoutHandle>( - tableHandle.connectorTableLayout); - VELOX_CHECK_NOT_NULL( - icebergLayout, - "Unexpected layout type {}", - tableHandle.connectorTableLayout->_type); - - for (const auto& entry : icebergLayout->partitionColumns) { - assignments.emplace( - entry.columnIdentity.name, toVeloxColumnHandle(&entry, typeParser)); - } - - // Add synthesized columns to the TableScanNode columnHandles as well. - for (const auto& entry : icebergLayout->predicateColumns) { - addSynthesizedColumn(entry.first, entry.second.columnType, entry.second); - } - - auto icebergTableHandle = - std::dynamic_pointer_cast( - tableHandle.connectorHandle); - VELOX_CHECK_NOT_NULL( - icebergTableHandle, - "Unexpected table handle type {}", - tableHandle.connectorHandle->_type); - - // Use fully qualified name if available. - std::string tableName = icebergTableHandle->schemaName.empty() - ? icebergTableHandle->icebergTableName.tableName - : fmt::format( - "{}.{}", - icebergTableHandle->schemaName, - icebergTableHandle->icebergTableName.tableName); - - return toHiveTableHandle( - icebergLayout->domainPredicate, - icebergLayout->remainingPredicate, - icebergLayout->pushdownFilterEnabled, - tableName, - icebergLayout->dataColumns, - tableHandle, - {}, - exprConverter, - typeParser); -} - -std::unique_ptr -IcebergPrestoToVeloxConnector::createConnectorProtocol() const { - return std::make_unique(); -} - -std::unique_ptr -TpchPrestoToVeloxConnector::toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* const connectorSplit) const { - auto tpchSplit = - dynamic_cast(connectorSplit); - VELOX_CHECK_NOT_NULL( - tpchSplit, "Unexpected split type {}", connectorSplit->_type); - return std::make_unique( - catalogId, tpchSplit->totalParts, tpchSplit->partNumber); -} - -std::unique_ptr -TpchPrestoToVeloxConnector::toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const { - auto tpchColumn = - dynamic_cast(column); - VELOX_CHECK_NOT_NULL( - tpchColumn, "Unexpected column handle type {}", column->_type); - return std::make_unique( - tpchColumn->columnName); -} - -std::unique_ptr -TpchPrestoToVeloxConnector::toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) const { - auto tpchLayout = - std::dynamic_pointer_cast( - tableHandle.connectorTableLayout); - VELOX_CHECK_NOT_NULL( - tpchLayout, - "Unexpected layout type {}", - tableHandle.connectorTableLayout->_type); - return std::make_unique( - tableHandle.connectorId, - tpch::fromTableName(tpchLayout->table.tableName), - tpchLayout->table.scaleFactor); -} - -std::unique_ptr -TpchPrestoToVeloxConnector::createConnectorProtocol() const { - return std::make_unique(); -} } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h index 6d80751778e6a..655b5cac3c6c1 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.h @@ -15,8 +15,8 @@ #include "PrestoToVeloxExpr.h" #include "presto_cpp/main/types/TypeParser.h" -#include "presto_cpp/presto_protocol/connector/hive/presto_protocol_hive.h" #include "presto_cpp/presto_protocol/core/ConnectorProtocol.h" +#include "presto_cpp/presto_protocol/core/presto_protocol_core.h" #include "velox/connectors/Connector.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/PlanNode.h" @@ -107,107 +107,4 @@ class PrestoToVeloxConnector { : connectorName_(std::move(connectorName)) {} const std::string connectorName_; }; - -class HivePrestoToVeloxConnector final : public PrestoToVeloxConnector { - public: - explicit HivePrestoToVeloxConnector(std::string connectorName) - : PrestoToVeloxConnector(std::move(connectorName)) {} - - std::unique_ptr toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; - - std::unique_ptr toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const final; - - std::unique_ptr toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) - const final; - - std::unique_ptr - toVeloxInsertTableHandle( - const protocol::CreateHandle* createHandle, - const TypeParser& typeParser) const final; - - std::unique_ptr - toVeloxInsertTableHandle( - const protocol::InsertHandle* insertHandle, - const TypeParser& typeParser) const final; - - std::unique_ptr - createVeloxPartitionFunctionSpec( - const protocol::ConnectorPartitioningHandle* partitioningHandle, - const std::vector& bucketToPartition, - const std::vector& channels, - const std::vector& constValues, - bool& effectivelyGather) const final; - - std::unique_ptr createConnectorProtocol() - const final; - - private: - std::vector> - toHiveColumns( - const protocol::List& inputColumns, - const TypeParser& typeParser, - bool& hasPartitionColumn) const; -}; - -class IcebergPrestoToVeloxConnector final : public PrestoToVeloxConnector { - public: - explicit IcebergPrestoToVeloxConnector(std::string connectorName) - : PrestoToVeloxConnector(std::move(connectorName)) {} - - std::unique_ptr toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; - - std::unique_ptr toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const final; - - std::unique_ptr toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) - const final; - - std::unique_ptr createConnectorProtocol() - const final; -}; - -class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { - public: - explicit TpchPrestoToVeloxConnector(std::string connectorName) - : PrestoToVeloxConnector(std::move(connectorName)) {} - - std::unique_ptr toVeloxSplit( - const protocol::ConnectorId& catalogId, - const protocol::ConnectorSplit* connectorSplit) const final; - - std::unique_ptr toVeloxColumnHandle( - const protocol::ColumnHandle* column, - const TypeParser& typeParser) const final; - - std::unique_ptr toVeloxTableHandle( - const protocol::TableHandle& tableHandle, - const VeloxExprConverter& exprConverter, - const TypeParser& typeParser, - std::unordered_map< - std::string, - std::shared_ptr>& assignments) - const final; - - std::unique_ptr createConnectorProtocol() - const final; -}; } // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.cpp new file mode 100644 index 0000000000000..5a86e99c2d6e1 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.cpp @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "presto_cpp/main/types/TpchPrestoToVeloxConnector.h" +#include "presto_cpp/presto_protocol/connector/tpch/TpchConnectorProtocol.h" +#include "velox/velox/connectors/tpch/TpchConnector.h" +#include "velox/velox/connectors/tpch/TpchConnectorSplit.h" + +namespace facebook::presto { + +using namespace velox; + +std::unique_ptr +TpchPrestoToVeloxConnector::toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* const connectorSplit) const { + auto tpchSplit = + dynamic_cast(connectorSplit); + VELOX_CHECK_NOT_NULL( + tpchSplit, "Unexpected split type {}", connectorSplit->_type); + return std::make_unique( + catalogId, tpchSplit->totalParts, tpchSplit->partNumber); +} + +std::unique_ptr +TpchPrestoToVeloxConnector::toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const { + auto tpchColumn = + dynamic_cast(column); + VELOX_CHECK_NOT_NULL( + tpchColumn, "Unexpected column handle type {}", column->_type); + return std::make_unique( + tpchColumn->columnName); +} + +std::unique_ptr +TpchPrestoToVeloxConnector::toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) const { + auto tpchLayout = + std::dynamic_pointer_cast( + tableHandle.connectorTableLayout); + VELOX_CHECK_NOT_NULL( + tpchLayout, + "Unexpected layout type {}", + tableHandle.connectorTableLayout->_type); + return std::make_unique( + tableHandle.connectorId, + tpch::fromTableName(tpchLayout->table.tableName), + tpchLayout->table.scaleFactor); +} + +std::unique_ptr +TpchPrestoToVeloxConnector::createConnectorProtocol() const { + return std::make_unique(); +} + +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.h b/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.h new file mode 100644 index 0000000000000..ba61aaf984a7b --- /dev/null +++ b/presto-native-execution/presto_cpp/main/types/TpchPrestoToVeloxConnector.h @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "presto_cpp/main/types/PrestoToVeloxConnector.h" +namespace facebook::presto { +class TpchPrestoToVeloxConnector final : public PrestoToVeloxConnector { + public: + explicit TpchPrestoToVeloxConnector(std::string connectorName) + : PrestoToVeloxConnector(std::move(connectorName)) {} + + std::unique_ptr toVeloxSplit( + const protocol::ConnectorId& catalogId, + const protocol::ConnectorSplit* connectorSplit) const final; + + std::unique_ptr toVeloxColumnHandle( + const protocol::ColumnHandle* column, + const TypeParser& typeParser) const final; + + std::unique_ptr toVeloxTableHandle( + const protocol::TableHandle& tableHandle, + const VeloxExprConverter& exprConverter, + const TypeParser& typeParser, + std::unordered_map< + std::string, + std::shared_ptr>& assignments) + const final; + + std::unique_ptr createConnectorProtocol() + const final; +}; +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index f6ec94bd1dd1b..2735544fac8cf 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -18,7 +18,7 @@ #include "presto_cpp/main/operators/PartitionAndSerialize.h" #include "presto_cpp/main/operators/ShuffleRead.h" #include "presto_cpp/main/operators/ShuffleWrite.h" -#include "presto_cpp/main/types/PrestoToVeloxConnector.h" +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" #include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h" #include "presto_cpp/main/types/tests/TestUtils.h" #include "velox/connectors/hive/TableHandle.h" diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp index 932f48a611f73..1f2418a2650fd 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxConnectorTest.cpp @@ -11,8 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "presto_cpp/main/types/PrestoToVeloxConnector.h" #include +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" +#include "presto_cpp/main/types/IcebergPrestoToVeloxConnector.h" #include "velox/common/base/tests/GTestUtils.h" using namespace facebook::presto; diff --git a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp index 5522684b262d2..4d5cee39e9a6d 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PrestoToVeloxSplitTest.cpp @@ -13,7 +13,7 @@ */ #include "presto_cpp/main/types/PrestoToVeloxSplit.h" #include -#include "presto_cpp/main/types/PrestoToVeloxConnector.h" +#include "presto_cpp/main/types/HivePrestoToVeloxConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" using namespace facebook::velox;