Skip to content

Commit 711c90b

Browse files
committed
DPL: allow determining the origin from user provide input
In order to support embedding, we need allow the user to provide a mapping between the desired origin and the level in the parent file chain where the table should be found.
1 parent 44b8e32 commit 711c90b

File tree

4 files changed

+114
-22
lines changed

4 files changed

+114
-22
lines changed

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <charconv>
1314
#include <memory>
15+
#include <ranges>
16+
#include <vector>
1417
#include "Framework/TableTreeHelpers.h"
1518
#include "Framework/AnalysisHelpers.h"
1619
#include "Framework/DataProcessingStats.h"
@@ -111,10 +114,31 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
111114
if (ctx.options().isSet("aod-parent-access-level")) {
112115
parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
113116
}
114-
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
115-
DeviceSpec const& spec,
116-
Monitoring& monitoring,
117-
DataProcessingStats& stats) {
117+
std::vector<std::pair<std::string, int>> originLevelMapping;
118+
if (ctx.options().isSet("aod-origin-level-mapping")) {
119+
auto originLevelMappingStr = ctx.options().get<std::string>("aod-origin-level-mapping");
120+
for (auto pairRange : originLevelMappingStr | std::views::split(',')) {
121+
std::string_view pair{pairRange.begin(), pairRange.end()};
122+
auto colonPos = pair.find(':');
123+
if (colonPos == std::string_view::npos) {
124+
LOGP(fatal, "Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
125+
continue;
126+
}
127+
std::string key(pair.substr(0, colonPos));
128+
std::string_view valueStr = pair.substr(colonPos + 1);
129+
int value{};
130+
auto [ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(), value);
131+
if (ec == std::errc{}) {
132+
originLevelMapping.emplace_back(std::move(key), value);
133+
} else {
134+
LOGP(fatal, "Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
135+
}
136+
}
137+
}
138+
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel, originLevelMapping](ConfigParamRegistry const& options,
139+
DeviceSpec const& spec,
140+
Monitoring& monitoring,
141+
DataProcessingStats& stats) {
118142
// FIXME: not actually needed, since data processing stats can specify that we should
119143
// send the initial value.
120144
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
@@ -134,7 +158,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
134158
auto maxRate = options.get<float>("aod-max-io-rate");
135159

136160
// create a DataInputDirector
137-
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement});
161+
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
138162
if (options.isSet("aod-reader-json")) {
139163
auto jsonFile = options.get<std::string>("aod-reader-json");
140164
if (!didir->readJson(jsonFile)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ void DataInputDescriptor::addFileNameHolder(FileNameHolder* fn)
122122
mfilenames.emplace_back(fn);
123123
}
124124

125-
bool DataInputDescriptor::setFile(int counter, std::string_view origin)
125+
bool DataInputDescriptor::setFile(int counter, int wantedParentLevel, std::string_view origin)
126126
{
127127
// no files left
128128
if (counter >= getNumberInputfiles()) {
@@ -133,7 +133,9 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
133133
// of the filename. In the future we might expand this for proper rewriting of the
134134
// filename based on the origin and the original file information.
135135
std::string filename = mfilenames[counter]->fileName;
136-
if (!origin.starts_with("AOD")) {
136+
// In case we do not need to remap parent levels, the requested origin is what
137+
// drives the filename.
138+
if (wantedParentLevel == -1 && !origin.starts_with("AOD")) {
137139
filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
138140
}
139141

@@ -218,11 +220,11 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
218220
return true;
219221
}
220222

221-
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
223+
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
222224
{
223225

224226
// open file
225-
if (!setFile(counter, origin)) {
227+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
226228
return 0ul;
227229
}
228230

@@ -234,10 +236,27 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::st
234236
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
235237
}
236238

237-
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
239+
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
238240
{
241+
// If mapped to a parent level deeper than current, skip directly to the right level.
242+
if (wantedParentLevel != -1 && mLevel < wantedParentLevel) {
243+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
244+
return {};
245+
}
246+
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
247+
auto parentFile = getParentFile(counter, numTF, "", wantedParentLevel, wantedOrigin);
248+
if (parentFile == nullptr) {
249+
return {};
250+
}
251+
int parentNumTF = parentFile->findDFNumber(0, folderName);
252+
if (parentNumTF == -1) {
253+
return {};
254+
}
255+
return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
256+
}
257+
239258
// open file
240-
if (!setFile(counter, origin)) {
259+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
241260
return {};
242261
}
243262

@@ -251,7 +270,7 @@ arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int n
251270
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
252271
}
253272

254-
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
273+
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
255274
{
256275
if (!mParentFileMap) {
257276
// This file has no parent map
@@ -288,7 +307,7 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
288307
mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
289308
mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
290309
mParentFile->fillInputfiles();
291-
mParentFile->setFile(0, origin);
310+
mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
292311
return mParentFile;
293312
}
294313

@@ -446,8 +465,38 @@ struct CalculateDelta {
446465
bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
447466
{
448467
CalculateDelta t(mIOTime);
449-
std::string origin = dh.dataOrigin.as<std::string>();
450-
auto folder = getFileFolder(counter, numTF, origin);
468+
std::string wantedOrigin = dh.dataOrigin.as<std::string>();
469+
int wantedLevel = -1;
470+
for (auto& [origin, level] : mContext.parentLevelToOrigin) {
471+
if (origin == wantedOrigin) {
472+
wantedLevel = level;
473+
break;
474+
}
475+
}
476+
477+
// If this origin is mapped to a parent level deeper than current, skip directly without
478+
// attempting to read from this level.
479+
if (wantedLevel != -1 && mLevel < wantedLevel) {
480+
if (!setFile(counter, wantedLevel, wantedOrigin)) {
481+
t.deactivate();
482+
return false;
483+
}
484+
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
485+
auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
486+
if (parentFile == nullptr) {
487+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
488+
throw std::runtime_error(fmt::format(R"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
489+
}
490+
int parentNumTF = parentFile->findDFNumber(0, folderName);
491+
if (parentNumTF == -1) {
492+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
493+
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folderName, parentRootFS->GetFile()->GetName()));
494+
}
495+
t.deactivate();
496+
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
497+
}
498+
499+
auto folder = getFileFolder(counter, numTF, wantedLevel, wantedOrigin);
451500
if (!folder.filesystem()) {
452501
t.deactivate();
453502
return false;
@@ -480,7 +529,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
480529
if (!format) {
481530
t.deactivate();
482531
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
483-
auto parentFile = getParentFile(counter, numTF, treename, origin);
532+
auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
484533
if (parentFile != nullptr) {
485534
int parentNumTF = parentFile->findDFNumber(0, folder.path());
486535
if (parentNumTF == -1) {
@@ -813,8 +862,15 @@ arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader d
813862
didesc = mdefaultDataInputDescriptor;
814863
}
815864
std::string origin = dh.dataOrigin.as<std::string>();
865+
int wantedLevel = -1;
866+
for (auto& [o, level] : mContext.parentLevelToOrigin) {
867+
if (o == origin) {
868+
wantedLevel = level;
869+
break;
870+
}
871+
}
816872

817-
return didesc->getFileFolder(counter, numTF, origin);
873+
return didesc->getFileFolder(counter, numTF, wantedLevel, origin);
818874
}
819875

820876
int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
@@ -836,8 +892,15 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte
836892
didesc = mdefaultDataInputDescriptor;
837893
}
838894
std::string origin = dh.dataOrigin.as<std::string>();
895+
int wantedLevel = -1;
896+
for (auto& [o, level] : mContext.parentLevelToOrigin) {
897+
if (o == origin) {
898+
wantedLevel = level;
899+
break;
900+
}
901+
}
839902

840-
return didesc->getTimeFrameNumber(counter, numTF, origin);
903+
return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
841904
}
842905

843906
bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <arrow/dataset/dataset.h>
2222

2323
#include <regex>
24+
#include <vector>
2425
#include "rapidjson/fwd.h"
2526

2627
namespace o2::monitoring
@@ -44,6 +45,7 @@ struct DataInputDirectorContext {
4445
o2::monitoring::Monitoring* monitoring = nullptr;
4546
int allowedParentLevel = 0;
4647
std::string parentFileReplacement = "";
48+
std::vector<std::pair<std::string, int>> parentLevelToOrigin = {};
4749
};
4850

4951
class DataInputDescriptor
@@ -71,7 +73,7 @@ class DataInputDescriptor
7173

7274
void addFileNameHolder(FileNameHolder* fn);
7375
int fillInputfiles();
74-
bool setFile(int counter, std::string_view origin);
76+
bool setFile(int counter, int wantedParentLevel, std::string_view wantedOrigin);
7577

7678
// getters
7779
std::string getInputfilesFilename();
@@ -81,9 +83,9 @@ class DataInputDescriptor
8183
int getNumberTimeFrames() { return mtotalNumberTimeFrames; }
8284
int findDFNumber(int file, std::string dfName);
8385

84-
uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin);
85-
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin);
86-
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, std::string_view origin);
86+
uint64_t getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
87+
arrow::dataset::FileSource getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
88+
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin);
8789
int getTimeFramesInFile(int counter);
8890
int getReadTimeFramesInFile(int counter);
8991

Framework/Core/src/Plugin.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ struct DiscoverAODOptionsInCommandLine : o2::framework::ConfigDiscoveryPlugin {
168168
if (key == "aod-parent-access-level") {
169169
results.push_back(ConfigParamSpec{"aod-parent-access-level", VariantType::String, value, {"Allow parent file access up to specified level. Default: no (0)"}});
170170
}
171+
if (key == "aod-origin-level-mapping") {
172+
results.push_back(ConfigParamSpec{"aod-origin-level-mapping", VariantType::String, value, {"Map origin to parent level for AOD reading. Syntax: ORIGIN:LEVEL[,ORIGIN2:LEVEL2,...]. E.g. \"DYN:1\"."}});
173+
}
171174
}
172175
if (injectOption) {
173176
results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, 505, {"AOD Compression options"}});

0 commit comments

Comments
 (0)