Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QC-1075] Allow for multiple data sources in QC tasks #2098

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Framework/include/QualityControl/TaskSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ struct TaskSpec {

// minimal valid spec
TaskSpec(std::string taskName, std::string className, std::string moduleName, std::string detectorName,
int cycleDurationSeconds, DataSourceSpec dataSource)
int cycleDurationSeconds, std::vector<DataSourceSpec> dataSources)
: taskName(std::move(taskName)),
className(std::move(className)),
moduleName(std::move(moduleName)),
detectorName(std::move(detectorName)),
cycleDurationSeconds(cycleDurationSeconds),
dataSource(std::move(dataSource))
dataSources(std::move(dataSources))
{
}

Expand All @@ -56,7 +56,7 @@ struct TaskSpec {
std::string detectorName = "Invalid";
int cycleDurationSeconds = -1; // simple syntax
std::vector<std::pair<size_t, size_t>> multipleCycleDurations = {}; // complex syntax: multiple durations can be set for different intervals
DataSourceSpec dataSource;
std::vector<DataSourceSpec> dataSources;
// advanced
bool active = true;
bool critical = true;
Expand Down
26 changes: 16 additions & 10 deletions Framework/src/InfrastructureGenerator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,14 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(const boost::p
} else // TaskLocationSpec::Remote
{
// Collecting Data Sampling Policies
if (taskSpec.dataSource.isOneOf(DataSourceType::DataSamplingPolicy)) {
samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.name, taskSpec.localControl, taskSpec.remoteMachine });
} else {
throw std::runtime_error(
"Configuration error: unsupported dataSource '" + taskSpec.dataSource.name + "' for a remote QC Task '" + taskSpec.taskName + "'");
for (const auto& dataSource : taskSpec.dataSources) {
if (dataSource.isOneOf(DataSourceType::DataSamplingPolicy)) {
samplingPoliciesForRemoteTasks.insert({ dataSource.name, taskSpec.localControl, taskSpec.remoteMachine });
} else {
throw std::runtime_error(
"Configuration error: unsupported dataSource '" + dataSource.name + "' for a remote QC Task '" +
taskSpec.taskName + "'");
}
}
}
}
Expand Down Expand Up @@ -286,11 +289,14 @@ o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructur
// (for the time being we don't foresee parallel tasks on QC servers, so no mergers here)

// Collecting Data Sampling Policies
if (taskSpec.dataSource.isOneOf(DataSourceType::DataSamplingPolicy)) {
samplingPoliciesForRemoteTasks.insert({ taskSpec.dataSource.name, taskSpec.localControl, taskSpec.remoteMachine });
} else {
throw std::runtime_error(
"Configuration error: unsupported dataSource '" + taskSpec.dataSource.name + "' for a remote QC Task '" + taskSpec.taskName + "'");
for (const auto& dataSource : taskSpec.dataSources) {
if (dataSource.isOneOf(DataSourceType::DataSamplingPolicy)) {
samplingPoliciesForRemoteTasks.insert({ dataSource.name, taskSpec.localControl, taskSpec.remoteMachine });
} else {
throw std::runtime_error(
"Configuration error: unsupported dataSource '" + dataSource.name + "' for a remote QC Task '" +
taskSpec.taskName + "'");
}
}

// Creating the remote task
Expand Down
10 changes: 9 additions & 1 deletion Framework/src/InfrastructureSpecReader.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,15 @@ TaskSpec InfrastructureSpecReader::readSpecEntry<TaskSpec>(const std::string& ta
ts.multipleCycleDurations.push_back(std::pair{ cycleDuration, validity });
}
}
ts.dataSource = readSpecEntry<DataSourceSpec>(taskID, taskTree.get_child("dataSource"), wholeTree);
if (taskTree.count("dataSources") > 0) {
for (const auto& [_key, dataSourceTree] : taskTree.get_child("dataSources")) {
(void)_key;
ts.dataSources.push_back(readSpecEntry<DataSourceSpec>(taskID, dataSourceTree, wholeTree));
}
} else {
ts.dataSources = { readSpecEntry<DataSourceSpec>(taskID, taskTree.get_child("dataSource"), wholeTree) };
}

ts.active = taskTree.get<bool>("active", ts.active);
ts.critical = taskTree.get<bool>("critical", ts.critical);
ts.maxNumberCycles = taskTree.get<int>("maxNumberCycles", ts.maxNumberCycles);
Expand Down
9 changes: 6 additions & 3 deletions Framework/src/TaskRunnerFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig

int parallelTaskID = id.value_or(0);

if (!taskSpec.dataSource.isOneOf(DataSourceType::DataSamplingPolicy, DataSourceType::Direct)) {
throw std::runtime_error("This data source of the task '" + taskSpec.taskName + "' is not supported.");
std::vector<InputSpec> inputs;
for (const auto& ds : taskSpec.dataSources) {
if (!ds.isOneOf(DataSourceType::DataSamplingPolicy, DataSourceType::Direct)) {
throw std::runtime_error("This data source of the task '" + taskSpec.taskName + "' is not supported.");
}
inputs.insert(inputs.end(), ds.inputs.begin(), ds.inputs.end());
}

// cycle duration
Expand All @@ -82,7 +86,6 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
if (taskSpec.cycleDurationSeconds > 0) { // if it was actually the old style, then we convert it to the new style
multipleCycleDurations = { { taskSpec.cycleDurationSeconds, 1 } };
}
auto inputs = taskSpec.dataSource.inputs;
inputs.emplace_back(createTimerInputSpec(globalConfig, multipleCycleDurations, taskSpec.detectorName, taskSpec.taskName));

static std::unordered_map<std::string, o2::base::GRPGeomRequest::GeomRequest> const geomRequestFromString = {
Expand Down
12 changes: 7 additions & 5 deletions doc/Advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -1368,11 +1368,13 @@ the "tasks" path.
],
"maxNumberCycles": "-1", "": "Number of cycles to perform. Use -1 for infinite.",
"disableLastCycle": "true", "": "Last cycle, upon EndOfStream, is not published. (default: false)",
"dataSource": { "": "Data source of the QC Task.",
"type": "dataSamplingPolicy", "": "Type of the data source, \"dataSamplingPolicy\" or \"direct\".",
"name": "tst-raw", "": "Name of Data Sampling Policy. Only for \"dataSamplingPolicy\" source.",
"query" : "raw:TST/RAWDATA/0", "": "Query of the data source. Only for \"direct\" source."
},
"dataSources": [{ "": "Data sources of the QC Task. The following are supported",
"type": "dataSamplingPolicy", "": "Type of the data source",
"name": "tst-raw", "": "Name of Data Sampling Policy"
}, {
"type": "direct", "": "connects directly to another output",
"query": "raw:TST/RAWDATA/0", "": "input spec query, as expected by DataDescriptorQueryBuilder"
}],
"taskParameters": { "": "User Task parameters which are then accessible as a key-value map.",
"myOwnKey": "myOwnValue", "": "An example of a key and a value. Nested structures are not supported"
},
Expand Down
Loading