forked from AliceO2Group/QualityControl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtestWorkflow.cxx
99 lines (82 loc) · 3.34 KB
/
testWorkflow.cxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
///
/// \file testWorkflow.cxx
/// \author Piotr Konopka
///
#include <DataSampling/DataSampling.h>
#include "QualityControl/InfrastructureGenerator.h"
using namespace o2;
using namespace o2::framework;
using namespace o2::utilities;
void customize(std::vector<CompletionPolicy>& policies)
{
DataSampling::CustomizeInfrastructure(policies);
quality_control::customizeInfrastructure(policies);
}
#include "getTestDataDirectory.h"
#include "QualityControl/CheckRunner.h"
#include "QualityControl/runnerUtils.h"
#include <Framework/runDataProcessing.h>
#include <Framework/ControlService.h>
#include <Configuration/ConfigurationFactory.h>
#include <Configuration/ConfigurationInterface.h>
#include <TH1F.h>
using namespace o2::quality_control::core;
using namespace o2::quality_control::checker;
using namespace o2::configuration;
WorkflowSpec defineDataProcessing(ConfigContext const&)
{
WorkflowSpec specs;
// The producer to generate some data in the workflow
DataProcessorSpec producer{
"producer",
Inputs{},
Outputs{
{ { "tst-data" }, "TST", "DATA" } },
AlgorithmSpec{
[](ProcessingContext& pctx) {
usleep(100000);
pctx.outputs().make<int>(OutputRef{ "tst-data" }, 1);
} }
};
specs.push_back(producer);
const std::string qcConfigurationSource = std::string("json://") + getTestDataDirectory() + "testWorkflow.json";
ILOG(Info) << "Using config file '" << qcConfigurationSource << "'" << ENDM;
// Generation of Data Sampling infrastructure
auto configInterface = ConfigurationFactory::getConfiguration(qcConfigurationSource);
auto dataSamplingTree = configInterface->getRecursive("dataSamplingPolicies");
DataSampling::GenerateInfrastructure(specs, dataSamplingTree);
// Generation of the QC topology (one task, one checker in this case)
quality_control::generateStandaloneInfrastructure(specs, configInterface->getRecursive());
// Finally the receiver
DataProcessorSpec receiver{
"receiver",
Inputs{
{ "checked-mo", "CTST", Check::createCheckDataDescription(getFirstCheckName(qcConfigurationSource)), 0, Lifetime::Sporadic } },
Outputs{},
AlgorithmSpec{
[](ProcessingContext& pctx) {
// If any message reaches this point, the QC workflow should work at least on a basic level.
auto qo = pctx.inputs().get<QualityObject*>("checked-mo");
if (!qo) {
ILOG(Error, Devel) << "Quality Object is a NULL" << ENDM;
pctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
return;
}
ILOG(Info) << qo->getName() << " - quality: " << qo->getQuality();
// We ask to shut the topology down, returning 0 if there were no ERROR logs.
pctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
} }
};
specs.push_back(receiver);
return specs;
}