Skip to content

Commit c8e3ab4

Browse files
authored
Merge branch 'dev' into cleanups-misc-more
2 parents af9cb29 + 0cf7ec2 commit c8e3ab4

File tree

10 files changed

+113
-120
lines changed

10 files changed

+113
-120
lines changed

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,13 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
6262

6363
AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
6464
{
65-
auto& ac = ctx.services().get<DanglingEdgesContext>();
6665
auto dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
6766
int compressionLevel = 505;
6867
if (ctx.options().hasOption("aod-writer-compression")) {
6968
compressionLevel = ctx.options().get<int>("aod-writer-compression");
7069
}
71-
return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
70+
return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
71+
auto outputInputs = ic.services().get<DanglingEdgesContext>().outputsInputsAOD;
7272
LOGP(debug, "======== getGlobalAODSink::Init ==========");
7373

7474
// find out if any table needs to be saved
@@ -241,14 +241,13 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
241241
};
242242
}
243243

244-
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
244+
AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& /*ctx*/)
245245
{
246-
using namespace monitoring;
247-
auto& ac = ctx.services().get<DanglingEdgesContext>();
248-
auto tskmap = ac.outTskMap;
249-
auto objmap = ac.outObjHistMap;
250-
251-
return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
246+
return AlgorithmSpec{[](InitContext& ic) -> std::function<void(ProcessingContext&)> {
247+
using namespace monitoring;
248+
auto& dec = ic.services().get<DanglingEdgesContext>();
249+
auto tskmap = dec.outTskMap;
250+
auto objmap = dec.outObjHistMap;
252251
auto& callbacks = ic.services().get<CallbackService>();
253252
auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
254253

@@ -278,7 +277,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
278277

279278
callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
280279
return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
281-
auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
280+
auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
282281
O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
283282
O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
284283
if (!ref.header) {
@@ -474,7 +473,7 @@ AlgorithmSpec AODWriterHelpers::getOutputObjHistWriter(ConfigContext const& ctx)
474473
};
475474
O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
476475
O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
477-
for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
476+
for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
478477
mergePart(pc.inputs().get("x", pi));
479478
}
480479
O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -67,38 +67,38 @@ void fillValidRoutes(CCDBFetcherHelper& helper, std::vector<o2::framework::Outpu
6767
}
6868
} // namespace
6969

70-
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
70+
AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
7171
{
72-
auto& ac = ctx.services().get<DanglingEdgesContext>();
73-
std::vector<std::shared_ptr<arrow::Schema>> schemas;
74-
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
72+
return adaptStateful([](ConfigParamRegistry const& options, DeviceSpec const& spec, InitContext& ic) {
73+
auto& dec = ic.services().get<DanglingEdgesContext>();
74+
std::vector<std::shared_ptr<arrow::Schema>> schemas;
75+
auto schemaMetadata = std::make_shared<arrow::KeyValueMetadata>();
7576

76-
for (auto& input : ac.analysisCCDBInputs) {
77-
std::vector<std::shared_ptr<arrow::Field>> fields;
78-
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
79-
schemaMetadata->Append("outputBinding", input.binding);
77+
for (auto& input : dec.analysisCCDBInputs) {
78+
std::vector<std::shared_ptr<arrow::Field>> fields;
79+
schemaMetadata->Append("outputRoute", DataSpecUtils::describe(input));
80+
schemaMetadata->Append("outputBinding", input.binding);
8081

81-
for (auto& m : input.metadata) {
82-
// Save the list of input tables
83-
if (m.name.starts_with("input:")) {
84-
auto name = m.name.substr(6);
85-
schemaMetadata->Append("sourceTable", name);
86-
schemaMetadata->Append("sourceMatcher", DataSpecUtils::describe(std::get<ConcreteDataMatcher>(DataSpecUtils::fromMetadataString(m.defaultValue.get<std::string>()).matcher)));
87-
continue;
88-
}
89-
// Ignore the non ccdb: entries
90-
if (!m.name.starts_with("ccdb:")) {
91-
continue;
82+
for (auto& m : input.metadata) {
83+
// Save the list of input tables
84+
if (m.name.starts_with("input:")) {
85+
auto name = m.name.substr(6);
86+
schemaMetadata->Append("sourceTable", name);
87+
continue;
88+
}
89+
// Ignore the non ccdb: entries
90+
if (!m.name.starts_with("ccdb:")) {
91+
continue;
92+
}
93+
// Create the schema of the output
94+
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95+
metadata->Append("url", m.defaultValue.asString());
96+
auto columnName = m.name.substr(strlen("ccdb:"));
97+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
9298
}
93-
// Create the schema of the output
94-
auto metadata = std::make_shared<arrow::KeyValueMetadata>();
95-
metadata->Append("url", m.defaultValue.asString());
96-
auto columnName = m.name.substr(strlen("ccdb:"));
97-
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, metadata));
99+
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
98100
}
99-
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
100-
}
101-
return adaptStateful([schemas](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
101+
102102
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
103103
CCDBFetcherHelper::initialiseHelper(*helper, options);
104104
std::unordered_map<std::string, int> bindings;
@@ -129,11 +129,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& ctx)
129129
int outputRouteIndex = bindings.at(outRouteDesc);
130130
auto& spec = helper->routes[outputRouteIndex].matcher;
131131
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
132-
for (auto& _ : schema->fields()) {
132+
for (auto const& _ : schema->fields()) {
133133
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
134134
}
135135

136-
for (size_t ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
136+
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
137137
std::shared_ptr<arrow::Array> chunk = timestampColumn->chunk(ci);
138138
auto const* timestamps = chunk->data()->GetValuesSafe<size_t>(1);
139139

Framework/CCDBSupport/src/AnalysisCCDBHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ namespace o2::framework
1717
{
1818

1919
struct AnalysisCCDBHelpers {
20-
static AlgorithmSpec fetchFromCCDB(ConfigContext const& ctx);
20+
static AlgorithmSpec fetchFromCCDB(ConfigContext const&);
2121
};
2222

2323
} // namespace o2::framework

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
527527
constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<T>>() / 10;
528528

529529
/// make sure options and configurables are set before expression infos are created
530-
homogeneous_apply_refs_sized<numElements>([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
530+
homogeneous_apply_refs_sized<numElements>([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
531531
/// extract conditions and append them as inputs
532532
homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
533533

@@ -619,7 +619,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
619619
std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
620620
// reset pre-slice for the next dataframe
621621
auto slices = pc.services().get<ArrowTableSlicingCache>();
622-
homogeneous_apply_refs_sized<numElements>([&pc, &slices](auto& element) {
622+
homogeneous_apply_refs_sized<numElements>([&slices](auto& element) {
623623
return analysis_task_parsers::updateSliceInfo(element, slices);
624624
},
625625
*(task.get()));

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirecto
9898
if (!keepString.empty()) {
9999
dod->reset();
100100
std::string d("dangling");
101-
if (d.find(keepString) == 0) {
101+
if (keepString.starts_with(d)) {
102102
// use the dangling outputs
103103
std::vector<InputSpec> danglingOutputs;
104104
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
@@ -144,7 +144,7 @@ void AnalysisSupportHelpers::addMissingOutputsToSpawner(std::vector<OutputSpec>
144144
sinks::append_to{publisher.outputs}; // append them to the publisher outputs
145145

146146
std::vector<InputSpec> additionalInputs;
147-
for (auto& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
147+
for (auto const& input : requestedSpecials | views::filter_not_matching(providedSpecials)) {
148148
input.metadata |
149149
views::filter_string_params_with("input:") |
150150
views::params_to_input_specs() |

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -685,33 +685,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
685685
}
686686
}
687687

688-
// replace writer as some outputs may have become dangling and some are now consumed
689-
auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
690-
691-
// create DataOutputDescriptor
692-
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
693-
694-
// select outputs of type AOD which need to be saved
695-
// ATTENTION: if there are dangling outputs the getGlobalAODSink
696-
// has to be created in any case!
697-
dec.outputsInputsAOD.clear();
698-
699-
for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
700-
if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
701-
auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
702-
if (!ds.empty() || isDangling[ii]) {
703-
dec.outputsInputsAOD.emplace_back(outputsInputs[ii]);
704-
}
705-
}
706-
}
688+
WorkflowHelpers::injectAODWriter(workflow, ctx);
707689

708-
// file sink for any AOD output
709-
if (!dec.outputsInputsAOD.empty()) {
710-
// add TFNumber and TFFilename as input to the writer
711-
dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
712-
dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
713-
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
714-
}
715690
// Move the dummy sink at the end, if needed
716691
for (size_t i = 0; i < workflow.size(); ++i) {
717692
if (workflow[i].name == "internal-dpl-injected-dummy-sink") {

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -420,10 +420,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
420420
// AODs are being injected on-the-fly, add error-handler reader
421421
aodReader.algorithm = AlgorithmSpec{
422422
adaptStateful(
423-
[outputs = aodReader.outputs](DeviceSpec const&) {
423+
[](DeviceSpec const& spec) {
424424
LOGP(warn, "Workflow with injected AODs has unsatisfied inputs:");
425-
for (auto const& output : outputs) {
426-
LOGP(warn, " {}", DataSpecUtils::describe(output));
425+
for (auto const& output : spec.outputs) {
426+
LOGP(warn, " {}", DataSpecUtils::describe(output.matcher));
427427
}
428428
LOGP(fatal, "Stopping.");
429429
// to ensure the output type for adaptStateful
@@ -531,43 +531,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
531531
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
532532
extraSpecs.clear();
533533

534-
/// Analyze all ouputs
535-
auto [outputsInputsTmp, isDanglingTmp] = analyzeOutputs(workflow);
536-
dec.isDangling = isDanglingTmp;
537-
dec.outputsInputs = outputsInputsTmp;
538-
539-
// create DataOutputDescriptor
540-
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
541-
542-
// select outputs of type AOD which need to be saved
543-
// ATTENTION: if there are dangling outputs the getGlobalAODSink
544-
// has to be created in any case!
545-
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
546-
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
547-
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
548-
if (ds.size() > 0 || dec.isDangling[ii]) {
549-
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
550-
}
551-
}
552-
}
553-
554-
// file sink for any AOD output
555-
if (dec.outputsInputsAOD.size() > 0) {
556-
// add TFNumber and TFFilename as input to the writer
557-
dec.outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
558-
dec.outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
559-
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
560-
extraSpecs.push_back(fileSink);
561-
562-
auto it = std::ranges::find_if(dec.outputsInputs, [](InputSpec& spec) -> bool {
563-
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
564-
});
565-
size_t ii = std::distance(dec.outputsInputs.begin(), it);
566-
dec.isDangling[ii] = false;
567-
}
568-
569-
workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end());
570-
extraSpecs.clear();
534+
injectAODWriter(workflow, ctx);
571535

572536
// Select dangling outputs which are not of type AOD
573537
std::vector<InputSpec> redirectedOutputsInputs;
@@ -704,6 +668,41 @@ void WorkflowHelpers::adjustTopology(WorkflowSpec& workflow, ConfigContext const
704668
}
705669
}
706670

671+
void WorkflowHelpers::injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx)
672+
{
673+
auto& dec = ctx.services().get<DanglingEdgesContext>();
674+
/// Analyze all ouputs
675+
std::tie(dec.outputsInputs, dec.isDangling) = analyzeOutputs(workflow);
676+
677+
// create DataOutputDescriptor
678+
std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
679+
680+
// select outputs of type AOD which need to be saved
681+
dec.outputsInputsAOD.clear();
682+
for (auto ii = 0u; ii < dec.outputsInputs.size(); ii++) {
683+
if (DataSpecUtils::partialMatch(dec.outputsInputs[ii], extendedAODOrigins)) {
684+
auto ds = dod->getDataOutputDescriptors(dec.outputsInputs[ii]);
685+
if (ds.size() > 0 || dec.isDangling[ii]) {
686+
dec.outputsInputsAOD.emplace_back(dec.outputsInputs[ii]);
687+
}
688+
}
689+
}
690+
691+
// file sink for any AOD output
692+
if (dec.outputsInputsAOD.size() > 0) {
693+
// add TFNumber and TFFilename as input to the writer
694+
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tfn", "TFN", "TFNumber"});
695+
DataSpecUtils::updateInputList(dec.outputsInputsAOD, InputSpec{"tff", "TFF", "TFFilename"});
696+
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(ctx);
697+
workflow.push_back(fileSink);
698+
699+
auto it = std::find_if(dec.outputsInputs.begin(), dec.outputsInputs.end(), [](InputSpec const& spec) -> bool {
700+
return DataSpecUtils::partialMatch(spec, o2::header::DataOrigin("TFN"));
701+
});
702+
dec.isDangling[std::distance(dec.outputsInputs.begin(), it)] = false;
703+
}
704+
}
705+
707706
void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow,
708707
std::vector<DeviceConnectionEdge>& logicalEdges,
709708
std::vector<OutputSpec>& outputs,

Framework/Core/src/WorkflowHelpers.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,9 @@ struct WorkflowHelpers {
182182
// @a ctx the context for the configuration phase
183183
static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx);
184184

185+
// Function to correctly add AOD writer
186+
static void injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx);
187+
185188
// Final adjustments to @a workflow after service devices have been injected.
186189
static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx);
187190

Steer/DigitizerWorkflow/src/ITSMFTDigitizerSpec.cxx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,10 @@ class ITSMFTDPLDigitizerTask : BaseDPLDigitizer
184184
// it can happen that in the digitization rofs without contributing hits are skipped
185185
// however downstream consumers of the clusters cannot know apriori the time structure
186186
// the cluster rofs do not account for the bias so it will start always at BC=0
187-
std::vector<o2::itsmft::ROFRecord> expDigitRofVec(nROFsTF);
188-
for (int iROF{0}; iROF < nROFsTF; ++iROF) {
187+
// also have to account for spillage into next TF
188+
const size_t nROFsLayer = std::max((size_t)nROFsTF, mROFRecordsAccum[iLayer].size());
189+
std::vector<o2::itsmft::ROFRecord> expDigitRofVec(nROFsLayer);
190+
for (int iROF{0}; iROF < nROFsLayer; ++iROF) {
189191
auto& rof = expDigitRofVec[iROF];
190192
int orb = iROF * DPLAlpideParam<N>::Instance().getROFLengthInBC(iLayer) / o2::constants::lhc::LHCMaxBunches + mFirstOrbitTF;
191193
int bc = iROF * DPLAlpideParam<N>::Instance().getROFLengthInBC(iLayer) % o2::constants::lhc::LHCMaxBunches;
@@ -204,7 +206,7 @@ class ITSMFTDPLDigitizerTask : BaseDPLDigitizer
204206
expROF.setFirstEntry(rof.getFirstEntry());
205207
expROF.setNEntries(rof.getNEntries());
206208
if (expROF.getBCData() != rof.getBCData()) {
207-
LOGP(fatal, "detected mismatch between expected ROF:{} and received ROF:{}", expROF.asString(), rof.asString());
209+
LOGP(fatal, "detected mismatch between expected {} and received {}", expROF.asString(), rof.asString());
208210
}
209211
}
210212
int prevFirst{0};
@@ -214,6 +216,9 @@ class ITSMFTDPLDigitizerTask : BaseDPLDigitizer
214216
}
215217
prevFirst = rof.getFirstEntry();
216218
}
219+
// if more rofs where accumulated than ROFs possible in the TF, cut them away
220+
// by construction expDigitRofVec is at least nROFsTF long
221+
expDigitRofVec.resize(nROFsTF);
217222
pc.outputs().snapshot(Output{Origin, "DIGITSROF", iLayer}, expDigitRofVec);
218223
} else {
219224
pc.outputs().snapshot(Output{Origin, "DIGITSROF", iLayer}, mROFRecordsAccum[iLayer]);

0 commit comments

Comments
 (0)