Skip to content

Commit

Permalink
[native] Convert Presto's groupedExecutionScanNodes to Velox.
Browse files Browse the repository at this point in the history
  • Loading branch information
spershin committed Feb 18, 2023
1 parent 15a496c commit d4bba71
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
10 changes: 7 additions & 3 deletions presto-native-execution/presto_cpp/main/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ using facebook::presto::protocol::TaskInfo;

namespace facebook::presto {

// Unlimited concurrent lifespans is translated to this limit.
constexpr uint32_t kMaxConcurrentLifespans{16};

namespace {

// If spilling is enabled and the given Task can spill, then this helper
Expand Down Expand Up @@ -283,9 +286,10 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
queryCtx->get<int32_t>(kMaxDriversPerTask.data(), maxDriversPerTask_);
concurrentLifespans = queryCtx->get<int32_t>(
kConcurrentLifespansPerTask.data(), concurrentLifespansPerTask_);
// Zero concurrent lifespans means 'unlimited'.
// Zero concurrent lifespans means 'unlimited', but we still limit the
// number to some reasonable one.
if (concurrentLifespans == 0) {
concurrentLifespans = std::numeric_limits<uint32_t>::max();
concurrentLifespans = kMaxConcurrentLifespans;
}

execTask = std::make_shared<exec::Task>(
Expand Down Expand Up @@ -318,7 +322,7 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTask(
if (execTask->isGroupedExecution()) {
LOG(INFO) << "Starting task " << taskId << " with " << maxDrivers
<< " max drivers and " << concurrentLifespans
<< " concurrent lifespans.";
<< " concurrent lifespans (grouped execution).";
} else {
LOG(INFO) << "Starting task " << taskId << " with " << maxDrivers
<< " max drivers.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ core::PlanNodePtr VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const bool constantValue =
joinType.value() == core::JoinType::kLeftSemiFilter;
projections.emplace_back(
std::make_shared<core::ConstantTypedExpr>(constantValue));
std::make_shared<core::ConstantTypedExpr>(BOOLEAN(), constantValue));

return std::make_shared<core::ProjectNode>(
node->id,
Expand Down Expand Up @@ -1985,6 +1985,9 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
planFragment.executionStrategy =
toStrategy(fragment.stageExecutionDescriptor.stageExecutionStrategy);
planFragment.numSplitGroups = descriptor.totalLifespans;
for (const auto& planNodeId : descriptor.groupedExecutionScanNodes) {
planFragment.groupedExecutionLeafNodeIds.emplace(planNodeId);
}

if (auto output = std::dynamic_pointer_cast<const protocol::OutputNode>(
fragment.root)) {
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 141 files

0 comments on commit d4bba71

Please sign in to comment.