Skip to content

Commit 6ada843

Browse files
committed
DPL: allow determining the origin from user provide input
In order to support embedding, we need allow the user to provide a mapping between the desired origin and the level in the parent file chain where the table should be found.
1 parent c9d07d8 commit 6ada843

File tree

6 files changed

+268
-23
lines changed

6 files changed

+268
-23
lines changed

Framework/AnalysisSupport/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ o2_add_test(DataInputDirector NAME test_Framework_test_DataInputDirector
4747
LABELS framework
4848
PUBLIC_LINK_LIBRARIES O2::FrameworkAnalysisSupport)
4949

50+
add_executable(o2-test-framework-analysis-support
51+
test/test_NavigateToLevel.cxx)
52+
target_link_libraries(o2-test-framework-analysis-support PRIVATE O2::FrameworkAnalysisSupport O2::Catch2)
53+
54+
get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
55+
set_property(TARGET o2-test-framework-analysis-support PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
56+
57+
add_test(NAME framework:analysis-support COMMAND o2-test-framework-analysis-support)
58+
5059
o2_add_test(TableToTree NAME benchmark_TableToTree
5160
SOURCES test/benchmark_TableToTree.cxx
5261
COMPONENT_NAME Framework

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
// or submit itself to any jurisdiction.
1111

1212
#include "AODJAlienReaderHelpers.h"
13+
#include <charconv>
1314
#include <memory>
15+
#include <ranges>
16+
#include <vector>
1417
#include "Framework/TableTreeHelpers.h"
1518
#include "Framework/AnalysisHelpers.h"
1619
#include "Framework/DataProcessingStats.h"
@@ -111,10 +114,31 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
111114
if (ctx.options().isSet("aod-parent-access-level")) {
112115
parentAccessLevel = ctx.options().get<int>("aod-parent-access-level");
113116
}
114-
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel](ConfigParamRegistry const& options,
115-
DeviceSpec const& spec,
116-
Monitoring& monitoring,
117-
DataProcessingStats& stats) {
117+
std::vector<std::pair<std::string, int>> originLevelMapping;
118+
if (ctx.options().isSet("aod-origin-level-mapping")) {
119+
auto originLevelMappingStr = ctx.options().get<std::string>("aod-origin-level-mapping");
120+
for (auto pairRange : originLevelMappingStr | std::views::split(',')) {
121+
std::string_view pair{pairRange.begin(), pairRange.end()};
122+
auto colonPos = pair.find(':');
123+
if (colonPos == std::string_view::npos) {
124+
LOGP(fatal, "Badly formatted aod-origin-level-mapping entry: \"{}\"", pair);
125+
continue;
126+
}
127+
std::string key(pair.substr(0, colonPos));
128+
std::string_view valueStr = pair.substr(colonPos + 1);
129+
int value{};
130+
auto [ptr, ec] = std::from_chars(valueStr.data(), valueStr.data() + valueStr.size(), value);
131+
if (ec == std::errc{}) {
132+
originLevelMapping.emplace_back(std::move(key), value);
133+
} else {
134+
LOGP(fatal, "Unable to parse level in aod-origin-level-mapping entry: \"{}\"", pair);
135+
}
136+
}
137+
}
138+
auto callback = AlgorithmSpec{adaptStateful([parentFileReplacement, parentAccessLevel, originLevelMapping](ConfigParamRegistry const& options,
139+
DeviceSpec const& spec,
140+
Monitoring& monitoring,
141+
DataProcessingStats& stats) {
118142
// FIXME: not actually needed, since data processing stats can specify that we should
119143
// send the initial value.
120144
stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_CREATED), DataProcessingStats::Op::Set, 0});
@@ -134,7 +158,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
134158
auto maxRate = options.get<float>("aod-max-io-rate");
135159

136160
// create a DataInputDirector
137-
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement});
161+
auto didir = std::make_shared<DataInputDirector>(std::vector<std::string>{filename}, DataInputDirectorContext{&monitoring, parentAccessLevel, parentFileReplacement, originLevelMapping});
138162
if (options.isSet("aod-reader-json")) {
139163
auto jsonFile = options.get<std::string>("aod-reader-json");
140164
if (!didir->readJson(jsonFile)) {

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ void DataInputDescriptor::addFileNameHolder(FileNameHolder* fn)
122122
mfilenames.emplace_back(fn);
123123
}
124124

125-
bool DataInputDescriptor::setFile(int counter, std::string_view origin)
125+
bool DataInputDescriptor::setFile(int counter, int wantedParentLevel, std::string_view origin)
126126
{
127127
// no files left
128128
if (counter >= getNumberInputfiles()) {
@@ -133,7 +133,9 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
133133
// of the filename. In the future we might expand this for proper rewriting of the
134134
// filename based on the origin and the original file information.
135135
std::string filename = mfilenames[counter]->fileName;
136-
if (!origin.starts_with("AOD")) {
136+
// In case we do not need to remap parent levels, the requested origin is what
137+
// drives the filename.
138+
if (wantedParentLevel == -1 && !origin.starts_with("AOD")) {
137139
filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
138140
}
139141

@@ -146,7 +148,19 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
146148
closeInputFile();
147149
}
148150

149-
mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
151+
TFile* tfile = nullptr;
152+
bool externalFile = false;
153+
for (auto& [name, f] : mContext.openFiles) {
154+
if (name == filename) {
155+
tfile = f;
156+
externalFile = true;
157+
break;
158+
}
159+
}
160+
if (tfile == nullptr) {
161+
tfile = TFile::Open(filename.c_str());
162+
}
163+
mCurrentFilesystem = std::make_shared<TFileFileSystem>(tfile, 50 * 1024 * 1024, mFactory, !externalFile);
150164
if (!mCurrentFilesystem.get()) {
151165
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
152166
}
@@ -218,11 +232,11 @@ bool DataInputDescriptor::setFile(int counter, std::string_view origin)
218232
return true;
219233
}
220234

221-
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
235+
uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
222236
{
223237

224238
// open file
225-
if (!setFile(counter, origin)) {
239+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
226240
return 0ul;
227241
}
228242

@@ -234,10 +248,32 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::st
234248
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
235249
}
236250

237-
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
251+
std::pair<DataInputDescriptor*, int> DataInputDescriptor::navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
252+
{
253+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
254+
return {nullptr, -1};
255+
}
256+
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
257+
auto parentFile = getParentFile(counter, numTF, "", wantedParentLevel, wantedOrigin);
258+
if (parentFile == nullptr) {
259+
return {nullptr, -1};
260+
}
261+
return {parentFile, parentFile->findDFNumber(0, folderName)};
262+
}
263+
264+
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
238265
{
266+
// If mapped to a parent level deeper than current, skip directly to the right level.
267+
if (wantedParentLevel != -1 && mLevel < wantedParentLevel) {
268+
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedParentLevel, wantedOrigin);
269+
if (parentFile == nullptr || parentNumTF == -1) {
270+
return {};
271+
}
272+
return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
273+
}
274+
239275
// open file
240-
if (!setFile(counter, origin)) {
276+
if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
241277
return {};
242278
}
243279

@@ -251,7 +287,7 @@ arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int n
251287
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
252288
}
253289

254-
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
290+
DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
255291
{
256292
if (!mParentFileMap) {
257293
// This file has no parent map
@@ -288,7 +324,7 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
288324
mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
289325
mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
290326
mParentFile->fillInputfiles();
291-
mParentFile->setFile(0, origin);
327+
mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
292328
return mParentFile;
293329
}
294330

@@ -450,8 +486,26 @@ struct CalculateDelta {
450486
bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
451487
{
452488
CalculateDelta t(mIOTime);
453-
std::string origin = dh.dataOrigin.as<std::string>();
454-
auto folder = getFileFolder(counter, numTF, origin);
489+
std::string wantedOrigin = dh.dataOrigin.as<std::string>();
490+
int wantedLevel = mContext.levelForOrigin(wantedOrigin);
491+
492+
// If this origin is mapped to a parent level deeper than current, skip directly without
493+
// attempting to read from this level.
494+
if (wantedLevel != -1 && mLevel < wantedLevel) {
495+
auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedLevel, wantedOrigin);
496+
if (parentFile == nullptr) {
497+
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
498+
throw std::runtime_error(fmt::format(R"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
499+
}
500+
if (parentNumTF == -1) {
501+
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
502+
throw std::runtime_error(fmt::format(R"(DF not found in parent file "{}")", parentRootFS->GetFile()->GetName()));
503+
}
504+
t.deactivate();
505+
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
506+
}
507+
508+
auto folder = getFileFolder(counter, numTF, wantedLevel, wantedOrigin);
455509
if (!folder.filesystem()) {
456510
t.deactivate();
457511
return false;
@@ -484,7 +538,7 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
484538
if (!format) {
485539
t.deactivate();
486540
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
487-
auto parentFile = getParentFile(counter, numTF, treename, origin);
541+
auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
488542
if (parentFile != nullptr) {
489543
int parentNumTF = parentFile->findDFNumber(0, folder.path());
490544
if (parentNumTF == -1) {
@@ -817,8 +871,9 @@ arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader d
817871
didesc = mdefaultDataInputDescriptor;
818872
}
819873
std::string origin = dh.dataOrigin.as<std::string>();
874+
int wantedLevel = mContext.levelForOrigin(origin);
820875

821-
return didesc->getFileFolder(counter, numTF, origin);
876+
return didesc->getFileFolder(counter, numTF, wantedLevel, origin);
822877
}
823878

824879
int DataInputDirector::getTimeFramesInFile(header::DataHeader dh, int counter)
@@ -840,8 +895,9 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte
840895
didesc = mdefaultDataInputDescriptor;
841896
}
842897
std::string origin = dh.dataOrigin.as<std::string>();
898+
int wantedLevel = mContext.levelForOrigin(origin);
843899

844-
return didesc->getTimeFrameNumber(counter, numTF, origin);
900+
return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
845901
}
846902

847903
bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <arrow/dataset/dataset.h>
2222

2323
#include <regex>
24+
#include <vector>
2425
#include "rapidjson/fwd.h"
2526

2627
namespace o2::monitoring
@@ -44,6 +45,20 @@ struct DataInputDirectorContext {
4445
o2::monitoring::Monitoring* monitoring = nullptr;
4546
int allowedParentLevel = 0;
4647
std::string parentFileReplacement = "";
48+
std::vector<std::pair<std::string, int>> parentLevelToOrigin = {};
49+
// Optional registry of pre-opened TFiles (keyed by name) used to bypass
50+
// TFile::Open for testing with in-memory TMemFile instances.
51+
std::vector<std::pair<std::string, TFile*>> openFiles = {};
52+
53+
int levelForOrigin(std::string_view origin) const
54+
{
55+
for (auto& [o, level] : parentLevelToOrigin) {
56+
if (o == origin) {
57+
return level;
58+
}
59+
}
60+
return -1;
61+
}
4762
};
4863

4964
class DataInputDescriptor
@@ -71,7 +86,7 @@ class DataInputDescriptor
7186

7287
void addFileNameHolder(FileNameHolder* fn);
7388
int fillInputfiles();
74-
bool setFile(int counter, std::string_view origin);
89+
bool setFile(int counter, int wantedParentLevel, std::string_view wantedOrigin);
7590

7691
// getters
7792
std::string getInputfilesFilename();
@@ -81,9 +96,12 @@ class DataInputDescriptor
8196
int getNumberTimeFrames() { return mtotalNumberTimeFrames; }
8297
int findDFNumber(int file, std::string dfName);
8398

84-
uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin);
85-
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin);
86-
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, std::string_view origin);
99+
uint64_t getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
100+
arrow::dataset::FileSource getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
101+
// Open the current file to populate the parent map, then return the parent descriptor and
102+
// the TF index within it that corresponds to numTF at this level. Returns {nullptr, -1} on failure.
103+
std::pair<DataInputDescriptor*, int> navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin);
104+
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin);
87105
int getTimeFramesInFile(int counter);
88106
int getReadTimeFramesInFile(int counter);
89107

0 commit comments

Comments
 (0)