Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL: Move DataInputDirector to arrow::Dataset API #13763

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
184 changes: 120 additions & 64 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx

Large diffs are not rendered by default.

20 changes: 9 additions & 11 deletions Framework/AnalysisSupport/src/DataInputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataAllocator.h"
#include "Framework/RootArrowFilesystem.h"

#include <arrow/filesystem/filesystem.h>
#include <arrow/dataset/dataset.h>

#include <regex>
#include "rapidjson/fwd.h"
Expand All @@ -31,16 +35,10 @@ struct FileNameHolder {
std::string fileName;
int numberOfTimeFrames = 0;
std::vector<uint64_t> listOfTimeFrameNumbers;
std::vector<std::string> listOfTimeFrameKeys;
std::vector<bool> alreadyRead;
};
FileNameHolder* makeFileNameHolder(std::string fileName);

struct FileAndFolder {
TFile* file = nullptr;
std::string folderName = "";
};

class DataInputDescriptor
{
/// Holds information concerning the reading of an aod table.
Expand All @@ -52,7 +50,6 @@ class DataInputDescriptor
std::string treename = "";
std::unique_ptr<data_matcher::DataDescriptorMatcher> matcher;

DataInputDescriptor() = default;
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring = nullptr, int allowedParentLevel = 0, std::string parentFileReplacement = "");

void printOut();
Expand All @@ -78,7 +75,7 @@ class DataInputDescriptor
int findDFNumber(int file, std::string dfName);

uint64_t getTimeFrameNumber(int counter, int numTF);
FileAndFolder getFileFolder(int counter, int numTF);
arrow::dataset::FileSource getFileFolder(int counter, int numTF);
DataInputDescriptor* getParentFile(int counter, int numTF, std::string treename);
int getTimeFramesInFile(int counter);
int getReadTimeFramesInFile(int counter);
Expand All @@ -90,6 +87,7 @@ class DataInputDescriptor
bool isAlienSupportOn() { return mAlienSupport; }

private:
o2::framework::RootObjectReadingFactory mFactory;
std::string minputfilesFile = "";
std::string* minputfilesFilePtr = nullptr;
std::string mFilenameRegex = "";
Expand All @@ -98,7 +96,7 @@ class DataInputDescriptor
std::string mParentFileReplacement;
std::vector<FileNameHolder*> mfilenames;
std::vector<FileNameHolder*>* mdefaultFilenamesPtr = nullptr;
TFile* mcurrentFile = nullptr;
std::shared_ptr<arrow::fs::FileSystem> mCurrentFilesystem;
int mCurrentFileID = -1;
bool mAlienSupport = false;

Expand Down Expand Up @@ -127,7 +125,6 @@ class DataInputDirector
~DataInputDirector();

void reset();
void createDefaultDataInputDescriptor();
void printOut();
bool atEnd(int counter);

Expand All @@ -140,10 +137,11 @@ class DataInputDirector
// getters
DataInputDescriptor* getDataInputDescriptor(header::DataHeader dh);
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
void createDefaultDataInputDescriptor();

bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF);
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
int getTimeFramesInFile(header::DataHeader dh, int counter);

uint64_t getTotalSizeCompressed();
Expand Down
40 changes: 34 additions & 6 deletions Framework/AnalysisSupport/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,48 @@ std::vector<std::string> getListOfTables(std::unique_ptr<TFile>& f)
{
std::vector<std::string> r;
TList* keyList = f->GetListOfKeys();
// We should handle two cases, one where the list of tables in a TDirectory,
// the other one where the dataframe number is just a prefix
std::string first = "";

for (auto key : *keyList) {
if (!std::string_view(key->GetName()).starts_with("DF_")) {
if (!std::string_view(key->GetName()).starts_with("DF_") && !std::string_view(key->GetName()).starts_with("/DF_")) {
continue;
}
auto* d = (TDirectory*)f->Get(key->GetName());
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
auto* d = (TDirectory*)f->GetObjectChecked(key->GetName(), TClass::GetClass("TDirectory"));
// Objects are in a folder, list it.
if (d) {
TList* branchList = d->GetListOfKeys();
for (auto b : *branchList) {
r.emplace_back(b->GetName());
}
break;
}

void* v = f->GetObjectChecked(key->GetName(), TClass::GetClass("ROOT::Experimental::RNTuple"));
if (v) {
std::string s = key->GetName();
size_t pos = s.find('-');
// Check if '-' is found
// Skip metaData and parentFiles
if (pos == std::string::npos) {
continue;
}
std::string t = s.substr(pos + 1);
// If we find a duplicate table name, it means we are in the next DF and we can stop.
if (t == first) {
break;
}
if (first.empty()) {
first = t;
}
// Create a new string starting after the '-'
r.emplace_back(t);
}
break;
}
return r;
}

auto readMetadata(std::unique_ptr<TFile>& currentFile) -> std::vector<ConfigParamSpec>
{
// Get the metadata, if any
Expand Down
28 changes: 23 additions & 5 deletions Framework/AnalysisSupport/src/TTreePlugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "Framework/Plugins.h"
#include "Framework/Signpost.h"
#include "Framework/Endian.h"
#include <arrow/buffer.h>
#include <arrow/dataset/file_base.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/array/array_nested.h>
Expand Down Expand Up @@ -158,7 +159,7 @@
class TTreeOutputStream : public arrow::io::OutputStream
{
public:
// Using a pointer means that the tree itself is owned by another

Check failure on line 162 in Framework/AnalysisSupport/src/TTreePlugin.cxx

View workflow job for this annotation

GitHub Actions / PR formatting / whitespace

Trailing spaces

Remove the trailing spaces at the end of the line.
// class
TTreeOutputStream(TTree *, std::string branchPrefix);

Expand Down Expand Up @@ -245,6 +246,8 @@
}
};

O2_DECLARE_DYNAMIC_LOG(ttree_format);

arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
const std::shared_ptr<arrow::dataset::ScanOptions>& options,
const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
Expand All @@ -268,13 +271,17 @@

int64_t rows = -1;
auto& tree = fs->GetTree(treeFragment->source());
O2_SIGNPOST_ID_FROM_POINTER(sid, ttree_format, tree.get());
O2_SIGNPOST_START(ttree_format, sid, "TTreeFormat", "Scanning Batches for %{public}s (cache %zu)", tree->GetName(), (size_t)tree->GetCacheSize());
for (auto& field : fields) {
// The field actually on disk
auto physicalField = physical_schema->GetFieldByName(field->name());
TBranch* branch = tree->GetBranch(physicalField->name().c_str());
assert(branch);
buffer.Reset();
auto totalEntries = branch->GetEntries();
size_t totalEntries = branch->GetEntries();
O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFormat", "Reading %zu entries from branch %{public}s to field %{public}s",
totalEntries, branch->GetName(), field->name().c_str());
if (rows == -1) {
rows = totalEntries;
}
Expand Down Expand Up @@ -344,12 +351,13 @@
auto typeSize = physicalField->type()->byte_width();
// This is needed for branches which have not been persisted.
auto bytes = branch->GetTotBytes();
auto branchSize = bytes ? bytes : 1000000;
auto&& result = arrow::AllocateResizableBuffer(branchSize, pool);
size_t branchSize = bytes ? bytes : 1000000;
O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFormat", "Allocating buffer for branch %{public}s %zu.", physicalField->name().c_str(), branchSize);
arrow::Result<std::shared_ptr<arrow::ResizableBuffer>>&& result = arrow::AllocateResizableBuffer(branchSize, pool);
if (!result.ok()) {
throw runtime_error("Cannot allocate values buffer");
}
std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(result).ValueUnsafe();
std::shared_ptr<arrow::Buffer> arrowValuesBuffer = result.MoveValueUnsafe();
auto ptr = arrowValuesBuffer->mutable_data();
if (ptr == nullptr) {
throw runtime_error("Invalid buffer");
Expand Down Expand Up @@ -379,7 +387,7 @@
if (!result.ok()) {
throw runtime_error("Cannot allocate offset buffer");
}
arrowOffsetBuffer = std::move(result).ValueUnsafe();
arrowOffsetBuffer = result.MoveValueUnsafe();
unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data();
auto* tPtrOffset = reinterpret_cast<int*>(ptrOffset);
offsets = std::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
Expand Down Expand Up @@ -434,6 +442,7 @@

columns.push_back(array);
}
O2_SIGNPOST_END(ttree_format, sid, "TTreeFormat", "Done creating batches %{public}s.", tree->GetName());
auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
totalCompressedSize += tree->GetZipBytes();
totalUncompressedSize += tree->GetTotBytes();
Expand Down Expand Up @@ -545,6 +554,9 @@
}
auto& tree = treeFs->GetTree(source);

O2_SIGNPOST_ID_FROM_POINTER(sid, ttree_format, tree.get());
O2_SIGNPOST_START(ttree_format, sid, "TTreeFileFormat::Inspect", "Starting inspection of source %{public}s", source.path().c_str());

auto branches = tree->GetListOfBranches();
auto n = branches->GetEntries();

Expand Down Expand Up @@ -581,18 +593,24 @@
auto field = std::make_shared<arrow::Field>(bi.ptr->GetName(), arrowTypeFromROOT(type, listSize));
fields.push_back(field);

O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Adding branch %{public}s to cache", bi.ptr->GetName());
tree->AddBranchToCache(bi.ptr);
if (strncmp(bi.ptr->GetName(), "fIndexArray", strlen("fIndexArray")) == 0) {
std::string sizeBranchName = bi.ptr->GetName();
sizeBranchName += "_size";
auto* sizeBranch = (TBranch*)tree->GetBranch(sizeBranchName.c_str());
if (sizeBranch) {
O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Adding branch %{public}s to cache", sizeBranch->GetName());
tree->AddBranchToCache(sizeBranch);
} else {
O2_SIGNPOST_EVENT_EMIT(ttree_format, sid, "TTreeFileFormat::Inspect", "Branch %{public}s not added to cache", sizeBranchName.c_str());
}
}
}
tree->StopCacheLearningPhase();

O2_SIGNPOST_END(ttree_format, sid, "TTreeFileFormat::Inspect", "Done inspection %zu fields found. Using %zu bytes for cache", fields.size(),
(size_t)tree->GetCacheSize());
return std::make_shared<arrow::Schema>(fields);
}

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/RootArrowFilesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class TFileFileSystem : public VirtualRootFileSystemBase
private:
TDirectoryFile* mFile;
RootObjectReadingFactory& mObjectFactory;
arrow::dataset::FileSource mCachedSource;
std::shared_ptr<VirtualRootFileSystemBase> mCachedFS;
};

class TBufferFileFS : public VirtualRootFileSystemBase
Expand Down
13 changes: 11 additions & 2 deletions Framework/Core/src/RootArrowFilesystem.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObject

std::shared_ptr<VirtualRootFileSystemBase> TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source)
{
// If the filesystem was already found last time we got called, lets return a copy
// cached version of it.
if (mCachedSource.path() == source.path()) {
return mCachedFS;
}
// We use a plugin to create the actual objects inside the
// file, so that we can support TTree and RNTuple at the same time
// without having to depend on both.
Expand All @@ -53,13 +58,17 @@ std::shared_ptr<VirtualRootFileSystemBase> TFileFileSystem::GetSubFilesystem(arr
continue;
}
if (handle) {
return capability.factory().getSubFilesystem(handle);
mCachedFS = capability.factory().getSubFilesystem(handle);
mCachedSource = source;
return mCachedFS;
}
}

auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass<TDirectory>());
if (directory) {
return std::shared_ptr<VirtualRootFileSystemBase>(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory));
mCachedFS = std::shared_ptr<VirtualRootFileSystemBase>(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory));
mCachedSource = source;
return mCachedFS;
}
throw runtime_error_f("Unsupported file layout");
}
Expand Down
12 changes: 11 additions & 1 deletion Framework/Core/src/TableTreeHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "Framework/TableTreeHelpers.h"
#include "Framework/Logger.h"
#include "Framework/Endian.h"
#include "Framework/Signpost.h"

#include "arrow/type_traits.h"
#include <arrow/dataset/file_base.h>
Expand All @@ -21,6 +22,9 @@

#include <memory>
#include <utility>

O2_DECLARE_DYNAMIC_LOG(tabletree_helpers);

namespace TableTreeHelpers
{
static constexpr char const* sizeBranchSuffix = "_size";
Expand Down Expand Up @@ -134,6 +138,7 @@ BranchToColumn::BranchToColumn(TBranch* branch, bool VLA, std::string name, EDat

std::pair<std::shared_ptr<arrow::ChunkedArray>, std::shared_ptr<arrow::Field>> BranchToColumn::read(TBuffer* buffer)
{
O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, buffer);
auto totalEntries = mBranch->GetEntries();
arrow::Status status;
int readEntries = 0;
Expand Down Expand Up @@ -170,7 +175,9 @@ std::pair<std::shared_ptr<arrow::ChunkedArray>, std::shared_ptr<arrow::Field>> B
}
} else {
// other types: use serialized read to build arrays directly
size_t branchSize = mBranch->GetTotBytes();
auto&& result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool);
O2_SIGNPOST_EVENT_EMIT(tabletree_helpers, sid, "BranchToColumn", "Allocating %ld bytes for %{public}s", branchSize, mBranch->GetName());
if (!result.ok()) {
throw runtime_error("Cannot allocate values buffer");
}
Expand Down Expand Up @@ -526,17 +533,20 @@ void TreeToTable::setLabel(const char* label)
mTableLabel = label;
}

void TreeToTable::fill(TTree*)
void TreeToTable::fill(TTree*tree)
{
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, &buffer);
O2_SIGNPOST_START(tabletree_helpers, sid, "TreeToTable", "Filling %{public}s", tree->GetName());
for (auto& reader : mBranchReaders) {
buffer.Reset();
auto arrayAndField = reader->read(&buffer);
columns.push_back(arrayAndField.first);
fields.push_back(arrayAndField.second);
}
O2_SIGNPOST_END(tabletree_helpers, sid, "TreeToTable", "Done filling.");

auto schema = std::make_shared<arrow::Schema>(fields, std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{mTableLabel}));
mTable = arrow::Table::Make(schema, columns);
Expand Down
Loading