From 8c955dd74747abf9fb152a13dbad6895d0d450fe Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Wed, 4 Dec 2024 15:00:22 +0100 Subject: [PATCH] DPL: Move DataInputDirector to arrow::Dataset API --- .../src/AODJAlienReaderHelpers.cxx | 16 ++- .../AnalysisSupport/src/DataInputDirector.cxx | 105 ++++++++++-------- .../AnalysisSupport/src/DataInputDirector.h | 14 +-- 3 files changed, 77 insertions(+), 58 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 9c19de85739ce..f8a9705e4eb62 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -10,10 +10,12 @@ // or submit itself to any jurisdiction. #include "AODJAlienReaderHelpers.h" +#include #include "Framework/TableTreeHelpers.h" #include "Framework/AnalysisHelpers.h" #include "Framework/DataProcessingStats.h" #include "Framework/RootTableBuilderHelpers.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AlgorithmSpec.h" #include "Framework/ConfigParamRegistry.h" #include "Framework/ControlService.h" @@ -41,6 +43,8 @@ #include #include #include +#include +#include using namespace o2; using namespace o2::aod; @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // Origin file name for derived output map auto o2 = Output(TFFileNameHeader); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - std::string currentFilename(fileAndFolder.file->GetName()); - if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') { + auto rootFS = std::dynamic_pointer_cast(fileAndFolder.filesystem()); + auto* f = dynamic_cast(rootFS->GetFile()); + std::string currentFilename(f->GetFile()->GetName()); + if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') { // This is not an absolute local path. Make it absolute. static std::string pwd = gSystem->pwd() + std::string("/"); - currentFilename = pwd + std::string(fileAndFolder.file->GetName()); + currentFilename = pwd + std::string(f->GetName()); } outputs.make(o2) = currentFilename; } @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf); - if (!fileAndFolder.file) { + + // In case the filesource is empty, move to the next one. + if (fileAndFolder.path().empty()) { fcnt += 1; ntf = 0; if (didir->atEnd(fcnt)) { diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 172ecd66c0e64..85ae51a68f574 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -11,6 +11,7 @@ #include "DataInputDirector.h" #include "Framework/DataDescriptorQueryBuilder.h" #include "Framework/Logger.h" +#include "Framework/RootArrowFilesystem.h" #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/Output.h" #include "Headers/DataHeader.h" @@ -26,8 +27,12 @@ #include "TGrid.h" #include "TObjString.h" #include "TMap.h" +#include "TFile.h" +#include +#include #include +#include #if __has_include() #include @@ -108,20 +113,22 @@ bool DataInputDescriptor::setFile(int counter) // open file auto filename = mfilenames[counter]->fileName; - if (mcurrentFile) { - if (mcurrentFile->GetName() == filename) { + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + if (rootFS.get()) { + if (rootFS->GetFile()->GetName() == filename) { return true; } closeInputFile(); } - mcurrentFile = TFile::Open(filename.c_str()); - if (!mcurrentFile) { + + mCurrentFilesystem = std::make_shared(TFile::Open(filename.c_str()), 50 * 1024 * 1024); + if (!mCurrentFilesystem.get()) { throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename)); } - mcurrentFile->SetReadaheadSize(50 * 1024 * 1024); + rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); // get the parent file map if exists - mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) + mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path) if (mParentFileMap && !mParentFileReplacement.empty()) { auto pos = mParentFileReplacement.find(';'); if (pos == std::string::npos) { @@ -141,7 +148,7 @@ bool DataInputDescriptor::setFile(int counter) // get the directory names if (mfilenames[counter]->numberOfTimeFrames <= 0) { std::regex TFRegex = std::regex("DF_[0-9]+"); - TList* keyList = mcurrentFile->GetListOfKeys(); + TList* keyList = rootFS->GetFile()->GetListOfKeys(); // extract TF numbers and sort accordingly for (auto key : *keyList) { @@ -193,26 +200,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF) return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF]; } -FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF) +arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF) { - FileAndFolder fileAndFolder; - // open file if (!setFile(counter)) { - return fileAndFolder; + return {}; } // no TF left if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) { - return fileAndFolder; + return {}; } - fileAndFolder.file = mcurrentFile; - fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; - mfilenames[counter]->alreadyRead[numTF] = true; - return fileAndFolder; + return {(mfilenames[counter]->listOfTimeFrameKeys)[numTF], mCurrentFilesystem}; } DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename) @@ -223,15 +225,17 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF]; auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str()); + // The current DF is not found in the parent map (this should not happen and is a fatal error) + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); if (!parentFileName) { - // The current DF is not found in the parent map (this should not happen and is a fatal error) - throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName())); return nullptr; } if (mParentFile) { // Is this still the corresponding to the correct file? - if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) { + auto parentRootFS = std::dynamic_pointer_cast(mParentFile->mCurrentFilesystem); + if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) { return mParentFile; } else { mParentFile->closeInputFile(); @@ -241,7 +245,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, } if (mLevel == mAllowedParentLevel) { - throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName())); + throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), + rootFS->GetFile()->GetName())); } LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str()); @@ -270,11 +275,13 @@ void DataInputDescriptor::printFileStatistics() if (wait_time < 0) { wait_time = 0; } - std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(), - mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(), + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + auto f = dynamic_cast(rootFS->GetFile()); + std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(), + f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(), ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel)); #if __has_include() - auto alienFile = dynamic_cast(mcurrentFile); + auto alienFile = dynamic_cast(f); if (alienFile) { monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed()); } @@ -285,7 +292,7 @@ void DataInputDescriptor::printFileStatistics() void DataInputDescriptor::closeInputFile() { - if (mcurrentFile) { + if (mCurrentFilesystem.get()) { if (mParentFile) { mParentFile->closeInputFile(); delete mParentFile; @@ -296,9 +303,9 @@ void DataInputDescriptor::closeInputFile() mParentFileMap = nullptr; printFileStatistics(); - mcurrentFile->Close(); - delete mcurrentFile; - mcurrentFile = nullptr; + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + rootFS->GetFile()->Close(); + mCurrentFilesystem.reset(); } } @@ -358,40 +365,46 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh { auto ioStart = uv_hrtime(); - auto fileAndFolder = getFileFolder(counter, numTF); - if (!fileAndFolder.file) { + auto folder = getFileFolder(counter, numTF); + if (!folder.filesystem()) { return false; } - auto fullpath = fileAndFolder.folderName + "/" + treename; - auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str()); + auto format = std::make_shared(totalSizeCompressed, totalSizeUncompressed); + auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()}; + auto schemaOpt = format->Inspect(fullpath); + auto schema = *schemaOpt; - if (!tree) { - LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str()); + auto fragment = format->MakeFragment(fullpath, {}, schema); + + if (!fragment.ok()) { + LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path()); auto parentFile = getParentFile(counter, numTF, treename); if (parentFile != nullptr) { - int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName); + int parentNumTF = parentFile->findDFNumber(0, folder.path()); if (parentNumTF == -1) { - throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName())); + auto parentRootFS = std::dynamic_pointer_cast(parentFile->mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName())); } // first argument is 0 as the parent file object contains only 1 file return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed); } - throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName())); + auto rootFS = std::dynamic_pointer_cast(mCurrentFilesystem); + throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName())); } // create table output auto o = Output(dh); - auto t2t = outputs.make(o); - // add branches to read - // fill the table - t2t->setLabel(tree->GetName()); - totalSizeCompressed += tree->GetZipBytes(); - totalSizeUncompressed += tree->GetTotBytes(); - t2t->addAllColumns(tree); - t2t->fill(tree); - delete tree; + // FIXME: This should allow me to create a memory pool + // which I can then use to scan the dataset. + // + auto f2b = outputs.make(o); + + //// add branches to read + //// fill the table + f2b->setLabel(treename.c_str()); + f2b->fill(*fragment, schema, format); mIOTime += (uv_hrtime() - ioStart); @@ -693,7 +706,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade return result; } -FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) +arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF) { auto didesc = getDataInputDescriptor(dh); // if NOT match then use defaultDataInputDescriptor diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index eca0ef195d111..c03104be4ffba 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -16,6 +16,9 @@ #include "Framework/DataDescriptorMatcher.h" #include "Framework/DataAllocator.h" +#include +#include + #include #include "rapidjson/fwd.h" @@ -36,11 +39,6 @@ struct FileNameHolder { }; FileNameHolder* makeFileNameHolder(std::string fileName); -struct FileAndFolder { - TFile* file = nullptr; - std::string folderName = ""; -}; - class DataInputDescriptor { /// Holds information concerning the reading of an aod table. @@ -78,7 +76,7 @@ class DataInputDescriptor int findDFNumber(int file, std::string dfName); uint64_t getTimeFrameNumber(int counter, int numTF); - FileAndFolder getFileFolder(int counter, int numTF); + arrow::dataset::FileSource getFileFolder(int counter, int numTF); DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename); int getTimeFramesInFile(int counter); int getReadTimeFramesInFile(int counter); @@ -98,7 +96,7 @@ class DataInputDescriptor std::string mParentFileReplacement; std::vector mfilenames; std::vector* mdefaultFilenamesPtr = nullptr; - TFile* mcurrentFile = nullptr; + std::shared_ptr mCurrentFilesystem; int mCurrentFileID = -1; bool mAlienSupport = false; @@ -143,7 +141,7 @@ class DataInputDirector bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); - FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF); + arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); uint64_t getTotalSizeCompressed();