Skip to content

Commit b004e27

Browse files
authored
QC-1132 Correct validity in PostProcessingTasks across run restart (#2190)
This makes sure that the SOR&EOR timestamps are retrieved from ECS and passed to the user task with UserOrControl trigger, but they are not used to establish the validity of produced objects. Also, we do not publish objects if the validity is going to be invalid, since they will not be stored anyway. We produce appropriate warnings in such case. The root cause for having EOR timestamp earlier than SOR for the 2nd run was discovered to come from ECS sending incorrect values. However, with this commit, these values will not be used anyway for determining objects validity.
1 parent bd7057c commit b004e27

7 files changed

+46
-22
lines changed

Framework/include/QualityControl/PostProcessingDevice.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class PostProcessingDevice : public framework::Task
7070
/// \brief Callback for CallbackService::Id::Start (DPL) a.k.a. RUN transition (FairMQ)
7171
void start(framework::ServiceRegistryRef services);
7272
/// \brief Callback for CallbackService::Id::Stop (DPL) a.k.a. STOP transition (FairMQ)
73-
void stop() override;
73+
void stop(framework::ServiceRegistryRef services);
7474
/// \brief Callback for CallbackService::Id::Reset (DPL) a.k.a. RESET DEVICE transition (FairMQ)
7575
void reset();
7676

Framework/include/QualityControl/PostProcessingRunner.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class PostProcessingRunner
7474
/// \brief Start transition. Throws on errors.
7575
void start(framework::ServiceRegistryRef dplServices);
7676
/// \brief Stop transition. Throws on errors.
77-
void stop();
77+
void stop(framework::ServiceRegistryRef dplServices);
7878
/// \brief Reset transition. Throws on errors.
7979
void reset();
8080
/// \brief Runs the task over selected timestamps, performing the full start, run, stop cycle.

Framework/src/PostProcessingDevice.cxx

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void PostProcessingDevice::init(framework::InitContext& ctx)
5959
// registering state machine callbacks
6060
ctx.services().get<CallbackService>().set<CallbackService::Id::Start>([this, services = ctx.services()]() mutable { start(services); });
6161
ctx.services().get<CallbackService>().set<CallbackService::Id::Reset>([this]() { reset(); });
62-
ctx.services().get<CallbackService>().set<CallbackService::Id::Stop>([this]() { stop(); });
62+
ctx.services().get<CallbackService>().set<CallbackService::Id::Stop>([this, services = ctx.services()]() mutable { stop(services); });
6363
}
6464

6565
void PostProcessingDevice::run(framework::ProcessingContext& ctx)
@@ -113,9 +113,9 @@ void PostProcessingDevice::start(ServiceRegistryRef services)
113113
mRunner->start(services);
114114
}
115115

116-
void PostProcessingDevice::stop()
116+
void PostProcessingDevice::stop(ServiceRegistryRef services)
117117
{
118-
mRunner->stop();
118+
mRunner->stop(services);
119119
}
120120

121121
void PostProcessingDevice::reset()

Framework/src/PostProcessingRunner.cxx

+34-11
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,21 @@ void PostProcessingRunner::runOverTimestamps(const std::vector<uint64_t>& timest
168168

169169
void PostProcessingRunner::start(framework::ServiceRegistryRef dplServices)
170170
{
171+
Activity activityFromDriver = mTaskConfig.activity;
172+
activityFromDriver.mValidity.setMin(getCurrentTimestamp());
171173
if (dplServices.active<framework::RawDeviceService>()) {
172-
mTaskConfig.activity = computeActivity(dplServices, mTaskConfig.activity);
174+
activityFromDriver = computeActivity(dplServices, activityFromDriver);
173175
QcInfoLogger::setPartition(mTaskConfig.activity.mPartitionName);
174176
}
175-
mActivity = mTaskConfig.activity;
176-
mActivity.mValidity = gInvalidValidityInterval;
177-
QcInfoLogger::setRun(mTaskConfig.activity.mId);
178-
mObjectManager->setActivity(mActivity);
177+
mActivity = activityFromDriver;
178+
mActivity.mValidity = gInvalidValidityInterval; // object validity shall be based on input objects, not run duration
179+
QcInfoLogger::setRun(mActivity.mId);
179180

180181
// register ourselves to the BK
181182
if (gSystem->Getenv("O2_QC_REGISTER_IN_BK")) { // until we are sure it works, we have to turn it on
182183
ILOG(Debug, Devel) << "Registering pp task to BookKeeping" << ENDM;
183184
try {
184-
Bookkeeping::getInstance().registerProcess(mTaskConfig.activity.mId, mRunnerConfig.taskName, mRunnerConfig.detectorName, bookkeeping::DPL_PROCESS_TYPE_QC_POSTPROCESSING, "");
185+
Bookkeeping::getInstance().registerProcess(mActivity.mId, mRunnerConfig.taskName, mRunnerConfig.detectorName, bookkeeping::DPL_PROCESS_TYPE_QC_POSTPROCESSING, "");
185186
} catch (std::runtime_error& error) {
186187
ILOG(Warning, Devel) << "Failed registration to the BookKeeping: " << error.what() << ENDM;
187188
}
@@ -190,7 +191,7 @@ void PostProcessingRunner::start(framework::ServiceRegistryRef dplServices)
190191
if (mTaskState == TaskState::Created || mTaskState == TaskState::Finished) {
191192
mInitTriggers = trigger_helpers::createTriggers(mTaskConfig.initTriggers, mTaskConfig);
192193
if (trigger_helpers::hasUserOrControlTrigger(mTaskConfig.initTriggers)) {
193-
doInitialize({ TriggerType::UserOrControl, false, mTaskConfig.activity });
194+
doInitialize({ TriggerType::UserOrControl, false, activityFromDriver, activityFromDriver.mValidity.getMin() });
194195
}
195196
} else if (mTaskState == TaskState::Running) {
196197
ILOG(Debug, Devel) << "Requested start, but the user task is already running - doing nothing." << ENDM;
@@ -201,11 +202,17 @@ void PostProcessingRunner::start(framework::ServiceRegistryRef dplServices)
201202
}
202203
}
203204

204-
void PostProcessingRunner::stop()
205+
void PostProcessingRunner::stop(framework::ServiceRegistryRef dplServices)
205206
{
206207
if (mTaskState == TaskState::Created || mTaskState == TaskState::Running) {
207208
if (trigger_helpers::hasUserOrControlTrigger(mTaskConfig.stopTriggers)) {
208-
doFinalize({ TriggerType::UserOrControl, false, mTaskConfig.activity });
209+
// We try to get SOR and EOR times for ECS, which could be needed by the user code.
210+
auto activityFromDriver = mActivity;
211+
activityFromDriver.mValidity.setMax(getCurrentTimestamp());
212+
if (dplServices.active<framework::RawDeviceService>()) {
213+
activityFromDriver = computeActivity(dplServices, activityFromDriver);
214+
}
215+
doFinalize({ TriggerType::UserOrControl, false, activityFromDriver, activityFromDriver.mValidity.getMax() });
209216
}
210217
} else if (mTaskState == TaskState::Finished) {
211218
ILOG(Debug, Devel) << "Requested stop, but the user task is already finalized - doing nothing." << ENDM;
@@ -232,6 +239,13 @@ void PostProcessingRunner::reset()
232239

233240
void PostProcessingRunner::updateValidity(const Trigger& trigger)
234241
{
242+
if (trigger == TriggerType::UserOrControl) {
243+
// we ignore it, because it would not make sense to use current time in tracking objects from the past,
244+
// especially in asynchronous postprocessing
245+
ILOG(Debug, Trace) << "Ignoring UserOrControl trigger in tracking objects validity" << ENDM;
246+
mObjectManager->setValidity(mActivity.mValidity);
247+
return;
248+
}
235249
if (!trigger.activity.mValidity.isValid()) {
236250
ILOG(Warning, Devel) << "Not updating objects validity, because the provided trigger validity is invalid ("
237251
<< trigger.activity.mValidity.getMin() << ", " << trigger.activity.mValidity.getMax() << ")" << ENDM;
@@ -270,7 +284,11 @@ void PostProcessingRunner::doUpdate(const Trigger& trigger)
270284
mTask->update(trigger, mServices);
271285
updateValidity(trigger);
272286

273-
mPublicationCallback(mObjectManager->getNonOwningArray());
287+
if (mActivity.mValidity.isValid()) {
288+
mPublicationCallback(mObjectManager->getNonOwningArray());
289+
} else {
290+
ILOG(Warning, Support) << "Objects will not be published because their validity is invalid. This should not happen." << ENDM;
291+
}
274292
}
275293

276294
void PostProcessingRunner::doFinalize(const Trigger& trigger)
@@ -283,7 +301,12 @@ void PostProcessingRunner::doFinalize(const Trigger& trigger)
283301
mTask->finalize(trigger, mServices);
284302
updateValidity(trigger);
285303

286-
mPublicationCallback(mObjectManager->getNonOwningArray());
304+
if (mActivity.mValidity.isValid()) {
305+
mPublicationCallback(mObjectManager->getNonOwningArray());
306+
} else {
307+
// TODO: we could consider using SOR, EOR as validity in such case, so empty objects are still stored in the QCDB.
308+
ILOG(Warning, Devel) << "Objects will not be published because their validity is invalid. Most likely the task's update() method was never triggered." << ENDM;
309+
}
287310
mTaskState = TaskState::Finished;
288311
mObjectManager->stopPublishingAll();
289312
}

Framework/src/runPostProcessing.cxx

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ int main(int argc, const char* argv[])
7878

7979
PostProcessingRunner runner(taskID);
8080
runner.init(configTree, WorkflowType::Standalone);
81+
ServiceRegistry registry;
8182

8283
if (vm.count("timestamps")) {
8384
// running the PP task on a set of timestamps
8485
runner.runOverTimestamps(vm["timestamps"].as<std::vector<uint64_t>>());
8586
} else {
86-
ServiceRegistry registry;
8787
// running the PP task with an event loop
8888
runner.start({ registry });
8989

@@ -96,7 +96,7 @@ int main(int argc, const char* argv[])
9696
usleep(1000000.0 * timer.getRemainingTime());
9797
}
9898
}
99-
runner.stop();
99+
runner.stop({ registry });
100100
return 0;
101101
} catch (const bpo::error& ex) {
102102
ILOG(Error, Ops) << "Exception caught: " << ex.what() << ENDM;

Framework/src/runPostProcessingOCC.cxx

+2-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ class PostProcessingOCCStateMachine : public RuntimeControlledObject
118118

119119
bool success = true;
120120
try {
121-
mRunner->stop();
121+
o2::framework::ServiceRegistry registry;
122+
mRunner->stop(registry);
122123
} catch (const std::exception& ex) {
123124
ILOG(Error, Support) << "Exception caught: " << ex.what() << ENDM;
124125
success = false;

Framework/src/runnerUtils.cxx

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ Activity computeActivity(framework::ServiceRegistryRef services, const Activity&
6565
{
6666
auto runNumber = computeActivityField<int>(services, "runNumber", fallbackActivity.mId);
6767
auto runType = computeActivityField<int>(services, "runType", fallbackActivity.mType);
68-
auto run_start_time_ms = computeActivityField<unsigned long>(services, "runStartTimeMs", fallbackActivity.mValidity.getMin());
69-
auto run_stop_time_ms = computeActivityField<unsigned long>(services, "runEndTimeMs", fallbackActivity.mValidity.getMax());
68+
auto runStartTimeMs = computeActivityField<unsigned long>(services, "runStartTimeMs", fallbackActivity.mValidity.getMin());
69+
auto runEndTimeMs = computeActivityField<unsigned long>(services, "runEndTimeMs", fallbackActivity.mValidity.getMax());
7070
auto partitionName = services.get<framework::RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", fallbackActivity.mPartitionName);
7171
auto periodName = services.get<framework::RawDeviceService>().device()->fConfig->GetProperty<std::string>("lhcPeriod", "");
7272
if (periodName.empty()) {
@@ -82,7 +82,7 @@ Activity computeActivity(framework::ServiceRegistryRef services, const Activity&
8282
periodName,
8383
fallbackActivity.mPassName,
8484
fallbackActivity.mProvenance,
85-
{ run_start_time_ms, run_stop_time_ms },
85+
{ runStartTimeMs, runEndTimeMs },
8686
beam_type,
8787
partitionName,
8888
fillNumber);

0 commit comments

Comments
 (0)