Skip to content

Commit 3837f97

Browse files
authored
refactor: Advance velox and update task creation to use disk … (#26232)
``` == NO RELEASE NOTE == ```
1 parent 5e22b9a commit 3837f97

File tree

2 files changed

+44
-44
lines changed

2 files changed

+44
-44
lines changed

presto-native-execution/presto_cpp/main/TaskManager.cpp

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -55,53 +55,53 @@ void cancelAbandonedTasksInternal(const TaskMap& taskMap, int32_t abandonedMs) {
5555
}
5656
}
5757

58-
// If spilling is enabled and the given Task can spill, then this helper
59-
// generates the spilling directory path for the Task, and sets the path to it
60-
// in the Task.
61-
static void maybeSetupTaskSpillDirectory(
58+
// If spilling is enabled and the task plan fragment can spill, then this helper
59+
// generates the disk spilling options for the task.
60+
std::optional<common::SpillDiskOptions> getTaskSpillOptions(
61+
const TaskId& taskId,
6262
const core::PlanFragment& planFragment,
63-
exec::Task& execTask,
64-
const std::string& baseSpillDirectory) {
65-
if (baseSpillDirectory.empty() ||
66-
!planFragment.canSpill(execTask.queryCtx()->queryConfig())) {
67-
return;
63+
const std::shared_ptr<core::QueryCtx>& queryCtx,
64+
const std::string& baseSpillDir) {
65+
if (baseSpillDir.empty() || !planFragment.canSpill(queryCtx->queryConfig())) {
66+
return std::nullopt;
6867
}
6968

70-
const auto includeNodeInSpillPath =
69+
common::SpillDiskOptions spillDiskOpts;
70+
const bool includeNodeInSpillPath =
7171
SystemConfig::instance()->includeNodeInSpillPath();
7272
auto nodeConfig = NodeConfig::instance();
7373
const auto [taskSpillDirPath, dateSpillDirPath] =
7474
TaskManager::buildTaskSpillDirectoryPath(
75-
baseSpillDirectory,
75+
baseSpillDir,
7676
nodeConfig->nodeInternalAddress(),
7777
nodeConfig->nodeId(),
78-
execTask.queryCtx()->queryId(),
79-
execTask.taskId(),
78+
queryCtx->queryId(),
79+
taskId,
8080
includeNodeInSpillPath);
81-
execTask.setSpillDirectory(taskSpillDirPath, /*alreadyCreated=*/false);
82-
83-
execTask.setCreateSpillDirectoryCb(
84-
[spillDir = taskSpillDirPath, dateStrDir = dateSpillDirPath]() {
85-
auto fs = filesystems::getFileSystem(dateStrDir, nullptr);
86-
// First create the top level directory (date string of the query) with
87-
// TTL or other configs if set.
88-
filesystems::DirectoryOptions options;
89-
// Do not fail if the directory already exist because another process
90-
// may have already created the dateStrDir.
91-
options.failIfExists = false;
92-
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
93-
if (!config.empty()) {
94-
options.values.emplace(
95-
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
96-
config);
97-
}
98-
fs->mkdir(dateStrDir, options);
99-
100-
// After the parent directory is created,
101-
// then create the spill directory for the actual task.
102-
fs->mkdir(spillDir);
103-
return spillDir;
104-
});
81+
spillDiskOpts.spillDirPath = taskSpillDirPath;
82+
spillDiskOpts.spillDirCreated = false;
83+
spillDiskOpts.spillDirCreateCb = [spillDir = taskSpillDirPath,
84+
dateDir = dateSpillDirPath]() {
85+
auto fs = filesystems::getFileSystem(dateDir, nullptr);
86+
// First create the top level directory (date string of the query) with
87+
// TTL or other configs if set.
88+
filesystems::DirectoryOptions options;
89+
// Do not fail if the directory already exist because another process
90+
// may have already created the dateStrDir.
91+
options.failIfExists = false;
92+
auto config = SystemConfig::instance()->spillerDirectoryCreateConfig();
93+
if (!config.empty()) {
94+
options.values.emplace(
95+
filesystems::DirectoryOptions::kMakeDirectoryConfig.toString(),
96+
config);
97+
}
98+
fs->mkdir(dateDir, options);
99+
// After the parent directory is created,
100+
// then create the spill directory for the actual task.
101+
fs->mkdir(spillDir);
102+
return spillDir;
103+
};
104+
return spillDiskOpts;
105105
}
106106

107107
// Keep outstanding Promises in RequestHandler's state itself.
@@ -554,6 +554,10 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
554554
prestoTask->updateInfoLocked(summarize));
555555
}
556556

557+
const auto baseSpillDir = *(baseSpillDir_.rlock());
558+
auto spillDiskOpts =
559+
getTaskSpillOptions(taskId, planFragment, queryCtx, baseSpillDir);
560+
557561
// Uses a temp variable to store the created velox task to destroy it
558562
// under presto task lock if spill directory setup fails. Otherwise, the
559563
// concurrent task creation retry from the coordinator might see the
@@ -567,12 +571,8 @@ std::unique_ptr<TaskInfo> TaskManager::createOrUpdateTaskImpl(
567571
std::move(queryCtx),
568572
exec::Task::ExecutionMode::kParallel,
569573
static_cast<exec::Consumer>(nullptr),
570-
prestoTask->id.stageId());
571-
// TODO: move spill directory creation inside velox task execution
572-
// whenever spilling is triggered. It will reduce the unnecessary file
573-
// operations on remote storage.
574-
const auto baseSpillDir = *(baseSpillDir_.rlock());
575-
maybeSetupTaskSpillDirectory(planFragment, *newExecTask, baseSpillDir);
574+
prestoTask->id.stageId(),
575+
spillDiskOpts);
576576

577577
prestoTask->task = std::move(newExecTask);
578578
prestoTask->info.needsPlan = false;

0 commit comments

Comments
 (0)