From af70596f6d244fb1412a3e8712ce959e3b6f8523 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 30 May 2023 13:36:42 -0400 Subject: [PATCH] * Add a new functionality to an Engine, to serialize metadata in memory after Open(ReadRandomAccess), to send over to another programs that can then use this metadata to open the same dataset with an accelerated function (using metadata from memory instead of retrieving from disk). BP4 and BP5 engine supports this new function. Program 1: reader = io.Open(fname, adios2::Mode::ReadRandomAccess); char *md; size_t mdsize; reader.GetMetadata(&md, &mdsize); Program 2: reader = io.Open(fname, md, mdsize); * Added Transport::CurrentPos() to get the current seek position --- bindings/C/adios2/c/adios2_c_engine.cpp | 21 ++ bindings/C/adios2/c/adios2_c_engine.h | 9 + bindings/C/adios2/c/adios2_c_io.cpp | 18 ++ bindings/C/adios2/c/adios2_c_io.h | 14 ++ bindings/CXX11/adios2/cxx11/Engine.cpp | 6 + bindings/CXX11/adios2/cxx11/Engine.h | 8 + bindings/CXX11/adios2/cxx11/IO.cpp | 8 + bindings/CXX11/adios2/cxx11/IO.h | 15 ++ examples/useCases/CMakeLists.txt | 1 + examples/useCases/ensembleRead/CMakeLists.txt | 9 + .../useCases/ensembleRead/ensembleRead.cpp | 237 ++++++++++++++++++ source/adios2/core/Engine.cpp | 16 ++ source/adios2/core/Engine.h | 21 ++ source/adios2/core/IO.cpp | 25 +- source/adios2/core/IO.h | 34 ++- source/adios2/engine/bp4/BP4Reader.cpp | 103 ++++++-- source/adios2/engine/bp4/BP4Reader.h | 8 + source/adios2/engine/bp5/BP5Reader.cpp | 133 +++++++++- source/adios2/engine/bp5/BP5Reader.h | 8 + source/adios2/toolkit/transport/Transport.h | 2 + .../toolkit/transport/file/FileAWSSDK.h | 2 + .../adios2/toolkit/transport/file/FileDaos.h | 2 + .../toolkit/transport/file/FileFStream.cpp | 12 + .../toolkit/transport/file/FileFStream.h | 2 + .../adios2/toolkit/transport/file/FileIME.cpp | 6 + .../adios2/toolkit/transport/file/FileIME.h | 2 + .../toolkit/transport/file/FilePOSIX.cpp | 5 + .../adios2/toolkit/transport/file/FilePOSIX.h | 2 + .../toolkit/transport/file/FileStdio.cpp | 5 + .../adios2/toolkit/transport/file/FileStdio.h | 2 + .../toolkit/transport/null/NullTransport.h | 2 + .../adios2/toolkit/transport/shm/ShmSystemV.h | 2 + .../toolkit/transportman/TransportMan.cpp | 8 + .../toolkit/transportman/TransportMan.h | 2 + testing/adios2/engine/bp/CMakeLists.txt | 1 + .../engine/bp/TestBPOpenWithMetadata.cpp | 203 +++++++++++++++ 36 files changed, 921 insertions(+), 33 deletions(-) create mode 100644 examples/useCases/ensembleRead/CMakeLists.txt create mode 100644 examples/useCases/ensembleRead/ensembleRead.cpp create mode 100644 testing/adios2/engine/bp/TestBPOpenWithMetadata.cpp diff --git a/bindings/C/adios2/c/adios2_c_engine.cpp b/bindings/C/adios2/c/adios2_c_engine.cpp index c933c96a41..46eb42422b 100644 --- a/bindings/C/adios2/c/adios2_c_engine.cpp +++ b/bindings/C/adios2/c/adios2_c_engine.cpp @@ -199,6 +199,27 @@ adios2_error adios2_engine_openmode(adios2_mode *mode, } } +adios2_error adios2_engine_get_metadata(adios2_engine *engine, char **md, + size_t *size) +{ + try + { + adios2::helper::CheckForNullptr( + engine, "for const adios2_engine, in call to adios2_get_metadata"); + + adios2::core::Engine *engineCpp = + reinterpret_cast(engine); + + engineCpp->GetMetadata(md, size); + return adios2_error_none; + } + catch (...) + { + return static_cast( + adios2::helper::ExceptionToError("adios2_get_metadata")); + } +} + adios2_error adios2_begin_step(adios2_engine *engine, const adios2_step_mode mode, const float timeout_seconds, diff --git a/bindings/C/adios2/c/adios2_c_engine.h b/bindings/C/adios2/c/adios2_c_engine.h index 984591dc7f..e4b018f326 100644 --- a/bindings/C/adios2/c/adios2_c_engine.h +++ b/bindings/C/adios2/c/adios2_c_engine.h @@ -56,6 +56,15 @@ adios2_error adios2_engine_get_type(char *type, size_t *size, adios2_error adios2_engine_openmode(adios2_mode *mode, const adios2_engine *engine); +/** Serialize all metadata right after engine is created, which can be + * delivered to other processes to open the same file for reading without + * opening and reading in metadata again. + * @return metadata (pointer to allocated memory) and size of metadata + * the pointer must be deallocated by user using free() + */ +adios2_error adios2_engine_get_metadata(adios2_engine *engine, char **md, + size_t *size); + /** * @brief Begin a logical adios2 step stream * Check each engine documentation for MPI collective/non-collective diff --git a/bindings/C/adios2/c/adios2_c_io.cpp b/bindings/C/adios2/c/adios2_c_io.cpp index 839f6a3a86..05aa5c5d88 100644 --- a/bindings/C/adios2/c/adios2_c_io.cpp +++ b/bindings/C/adios2/c/adios2_c_io.cpp @@ -947,6 +947,24 @@ adios2_engine *adios2_open(adios2_io *io, const char *name, return engine; } +adios2_engine *adios2_open_with_metadata(adios2_io *io, const char *name, + const char *md, const size_t mdsize) +{ + adios2_engine *engine = nullptr; + try + { + adios2::helper::CheckForNullptr( + io, "for adios2_io, in call to adios2_open_with_metadata"); + engine = reinterpret_cast( + &reinterpret_cast(io)->Open(name, md, mdsize)); + } + catch (...) + { + adios2::helper::ExceptionToError("adios2_open_with_metadata"); + } + return engine; +} + adios2_error adios2_flush_all_engines(adios2_io *io) { try diff --git a/bindings/C/adios2/c/adios2_c_io.h b/bindings/C/adios2/c/adios2_c_io.h index 271c2d87c6..18e2f0df22 100644 --- a/bindings/C/adios2/c/adios2_c_io.h +++ b/bindings/C/adios2/c/adios2_c_io.h @@ -325,6 +325,20 @@ adios2_error adios2_remove_all_attributes(adios2_io *io); adios2_engine *adios2_open(adios2_io *io, const char *name, const adios2_mode mode); +/** + * Open an Engine to start heavy-weight input/output operations. + * This function is for opening a file (not stream) with ReadRandomAccess mode + * and supplying the metadata already in memory. The metadata should be + * retrieved by another program calling adios2_engine_get_metadata() after + * opening the file. + * @param io engine owner + * @param name unique engine identifier + * @param md file metadata residing in memory + * @return success: handler, failure: NULL + */ +adios2_engine *adios2_open_with_metadata(adios2_io *io, const char *name, + const char *md, const size_t mdsize); + #if ADIOS2_USE_MPI /** * Open an Engine to start heavy-weight input/output operations. diff --git a/bindings/CXX11/adios2/cxx11/Engine.cpp b/bindings/CXX11/adios2/cxx11/Engine.cpp index 748af4e041..e3f3eb690e 100644 --- a/bindings/CXX11/adios2/cxx11/Engine.cpp +++ b/bindings/CXX11/adios2/cxx11/Engine.cpp @@ -54,6 +54,12 @@ Mode Engine::OpenMode() const return m_Engine->OpenMode(); } +void Engine::GetMetadata(char **md, size_t *size) const +{ + helper::CheckForNullptr(m_Engine, "in call to Engine::GetMetadata"); + m_Engine->GetMetadata(md, size); +} + StepStatus Engine::BeginStep() { helper::CheckForNullptr(m_Engine, "in call to Engine::BeginStep"); diff --git a/bindings/CXX11/adios2/cxx11/Engine.h b/bindings/CXX11/adios2/cxx11/Engine.h index 1043315e91..24b173d3d2 100644 --- a/bindings/CXX11/adios2/cxx11/Engine.h +++ b/bindings/CXX11/adios2/cxx11/Engine.h @@ -75,6 +75,14 @@ class Engine */ Mode OpenMode() const; + /** Serialize all metadata right after engine is created, which can be + * delivered to other processes to open the same file for reading without + * opening and reading in metadata again. + * @return metadata (pointer to allocated memory) and size of metadata + * the pointer must be deallocated by user using free() + */ + void GetMetadata(char **md, size_t *size) const; + /** * Begin a logical adios2 step, overloaded version with timeoutSeconds = 0 * and mode = Read diff --git a/bindings/CXX11/adios2/cxx11/IO.cpp b/bindings/CXX11/adios2/cxx11/IO.cpp index e1eee598b6..42bad31918 100644 --- a/bindings/CXX11/adios2/cxx11/IO.cpp +++ b/bindings/CXX11/adios2/cxx11/IO.cpp @@ -109,6 +109,14 @@ Engine IO::Open(const std::string &name, const Mode mode) "for engine " + name + ", in call to IO::Open"); return Engine(&m_IO->Open(name, mode)); } + +Engine IO::Open(const std::string &name, const char *md, const size_t mdsize) +{ + helper::CheckForNullptr(m_IO, + "for engine " + name + ", in call to IO::Open"); + return Engine(&m_IO->Open(name, md, mdsize)); +} + Group IO::InquireGroup(char delimiter) { return Group(&m_IO->CreateGroup(delimiter)); diff --git a/bindings/CXX11/adios2/cxx11/IO.h b/bindings/CXX11/adios2/cxx11/IO.h index cb8a44a2e9..16d162c43b 100644 --- a/bindings/CXX11/adios2/cxx11/IO.h +++ b/bindings/CXX11/adios2/cxx11/IO.h @@ -307,6 +307,21 @@ class IO Engine Open(const std::string &name, const Mode mode, MPI_Comm comm); #endif + /** + * Overloaded version that is specifically for a serial program + * opening a file (not stream) with ReadRandomAccess mode and + * supplying the metadata already in memory. The metadata + * should be retrieved by another program calling engine.GetMetadata() + * after opening the file. + * @param name unique engine identifier within IO object + * (file name in case of File transports) + * @param md file metadata residing in memory + * @return a reference to a derived object of the Engine class + * @exception std::invalid_argument if Engine with unique name is already + * created with another Open + */ + Engine Open(const std::string &name, const char *md, const size_t mdsize); + /** Flushes all engines created with this IO with the Open function */ void FlushAll(); diff --git a/examples/useCases/CMakeLists.txt b/examples/useCases/CMakeLists.txt index f5de8aeacf..645d0a7954 100644 --- a/examples/useCases/CMakeLists.txt +++ b/examples/useCases/CMakeLists.txt @@ -4,4 +4,5 @@ #------------------------------------------------------------------------------# add_subdirectory(insituGlobalArrays) +add_subdirectory(ensembleRead) diff --git a/examples/useCases/ensembleRead/CMakeLists.txt b/examples/useCases/ensembleRead/CMakeLists.txt new file mode 100644 index 0000000000..35d25d482d --- /dev/null +++ b/examples/useCases/ensembleRead/CMakeLists.txt @@ -0,0 +1,9 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +if(ADIOS2_HAVE_MPI) + add_executable(ensembleRead ensembleRead.cpp) + target_link_libraries(ensembleRead adios2::cxx11_mpi MPI::MPI_C) +endif() diff --git a/examples/useCases/ensembleRead/ensembleRead.cpp b/examples/useCases/ensembleRead/ensembleRead.cpp new file mode 100644 index 0000000000..6fdc66afd1 --- /dev/null +++ b/examples/useCases/ensembleRead/ensembleRead.cpp @@ -0,0 +1,237 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * A Use Case for In Situ visulization frameworks (Conduit, SENSEI) + * + * Read in the variables that the Writer wrote. + * Every process should read only what the corresponding Writer wrote + * This is an N to N case + * + * Created on: Jul 11, 2017 + * Author: pnorbert + */ + +#include // std::transform +#include +#include +#include +#include +#include +#include //std::accumulate +#include // sleep_for +#include + +#include + +#include + +typedef std::chrono::duration Seconds; +typedef std::chrono::time_point< + std::chrono::steady_clock, + std::chrono::duration> + TimePoint; + +inline TimePoint Now() { return std::chrono::steady_clock::now(); } + +struct VarInfo +{ + std::string varName; + std::string type; + adios2::Dims shape; + adios2::ShapeID shapeID; + size_t nSteps; + std::vector data; + VarInfo(const std::string &name, const std::string &type, + const adios2::Dims shape, const adios2::ShapeID shapeID, + const size_t nsteps) + : varName(name), type(type), shape(shape), shapeID(shapeID), + nSteps(nsteps){}; +}; + +std::string DimsToString(adios2::Dims &dims) +{ + std::string s = ""; + for (size_t i = 0; i < dims.size(); i++) + { + if (i > 0) + { + s += "x"; + } + s += std::to_string(dims[i]); + } + s += ""; + return s; +} + +size_t GetTotalSize(adios2::Dims &dimensions, size_t elementSize = 1) +{ + return std::accumulate(dimensions.begin(), dimensions.end(), elementSize, + std::multiplies()); +} + +template +void ReadVariable(int rank, const std::string &name, const std::string &type, + adios2::Engine &reader, adios2::IO &io, + std::vector &varinfos) +{ + adios2::Variable variable = io.InquireVariable(name); + varinfos.push_back( + VarInfo(name, type, variable.Shape(), variable.ShapeID(), 1)); + auto vit = varinfos.rbegin(); + vit->nSteps = variable.Steps(); + if (vit->shapeID == adios2::ShapeID::GlobalArray) + { + size_t n = vit->nSteps * GetTotalSize(vit->shape, sizeof(T)); + vit->data.resize(n); + adios2::Dims start(vit->shape.size()); + variable.SetSelection({start, vit->shape}); + variable.SetStepSelection({0, vit->nSteps}); + T *dataptr = reinterpret_cast(vit->data.data()); + reader.Get(variable, dataptr); + } + else if (vit->shapeID == adios2::ShapeID::GlobalValue) + { + size_t n = vit->nSteps * sizeof(T); + vit->data.resize(n); + variable.SetStepSelection({0, vit->nSteps}); + T *dataptr = reinterpret_cast(vit->data.data()); + reader.Get(variable, dataptr); + } +} + +std::vector ReadFileContent(int rank, adios2::Engine &reader, + adios2::IO &io) +{ + std::map varNameList = io.AvailableVariables(); + std::vector varinfos; + for (auto &var : varNameList) + { + const std::string &name(var.first); + auto it = var.second.find("Type"); + const std::string &type = it->second; + if (type == "struct") + { + // not supported + } +#define declare_template_instantiation(T) \ + else if (type == adios2::GetType()) \ + { \ + ReadVariable(rank, name, type, reader, io, varinfos); \ + } + ADIOS2_FOREACH_STDTYPE_1ARG(declare_template_instantiation) +#undef declare_template_instantiation + } + + reader.PerformGets(); + return varinfos; +} + +void ProcessFile(int rank, adios2::Engine &reader, adios2::IO &io, + Seconds opentime) +{ + auto now = Now(); + std::vector varinfos = ReadFileContent(rank, reader, io); + Seconds readtime = Now() - now; + + std::cout << "File info on rank " << rank << ":" << std::endl; + std::cout << " Open time: " << opentime.count() << "s" << std::endl; + std::cout << " Read time: " << readtime.count() << "s" << std::endl; + std::cout << " Steps in file: " << reader.Steps() << std::endl; + std::cout << " Total number of variables = " << varinfos.size() + << std::endl; + for (auto &vi : varinfos) + { + std::cout << " Name: " << vi.varName + << " dimensions = " << DimsToString(vi.shape) + << " steps = " << vi.nSteps << " size = " << vi.data.size() + << " bytes" << std::endl; + } +} + +int main(int argc, char *argv[]) +{ + if (argc < 2) + { + std::cout << "Usage: " << argv[0] << " BP-file" << std::endl; + return -1; + } + std::string fname = argv[1]; + + int rank = 0, nproc = 1; + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &nproc); + + /* Each process is acting as a serial program */ + adios2::ADIOS adios; // independent ADIOS object for each single process + + adios2::IO io = adios.DeclareIO("Input"); + char *fileMetadata; + size_t fileMetadataSize; + + if (!rank) + { + std::cout << "First process opens file " << fname << std::endl; + adios2::Engine reader; + auto now = Now(); + reader = io.Open(fname, adios2::Mode::ReadRandomAccess); + Seconds opentime = Now() - now; + reader.GetMetadata(&fileMetadata, &fileMetadataSize); + std::cout << "Serialized metadata size = " << fileMetadataSize + << std::endl; + ProcessFile(rank, reader, io, opentime); + reader.Close(); + std::cout << "\n===== End of first process file processing =====\n" + << std::endl; + } + + /* Send metadata to all processes via MPI + * (Note limitation to 2GB due MPI int) + */ + MPI_Bcast(&fileMetadataSize, 1, MPI_INT64_T, 0, MPI_COMM_WORLD); + if (fileMetadataSize > (size_t)std::numeric_limits::max()) + { + if (!rank) + std::cout << "ERROR: metadata size is >2GB, not supported by " + "MPI_BCast" + << std::endl; + MPI_Abort(MPI_COMM_WORLD, 1); + } + + if (rank) + { + fileMetadata = (char *)malloc(fileMetadataSize); + } + int mdsize = (int)fileMetadataSize; + MPI_Bcast(fileMetadata, mdsize, MPI_CHAR, 0, MPI_COMM_WORLD); + + /* "Open" data by passing metadata to the adios engine */ + auto now = Now(); + adios2::Engine reader = io.Open(fname, fileMetadata, fileMetadataSize); + Seconds opentime = Now() - now; + free(fileMetadata); + + /* Process file in a sequentialized order only for pretty printing */ + MPI_Status status; + int token = 0; + if (rank > 0) + { + MPI_Recv(&token, 1, MPI_INT, rank - 1, 0, MPI_COMM_WORLD, &status); + } + + ProcessFile(rank, reader, io, opentime); + + if (rank < nproc - 1) + { + std::chrono::milliseconds timespan(100); + std::this_thread::sleep_for(timespan); + MPI_Send(&token, 1, MPI_INT, rank + 1, 0, MPI_COMM_WORLD); + } + + // Called once: indicate that we are done with this output for the run + reader.Close(); + + MPI_Finalize(); + return 0; +} diff --git a/source/adios2/core/Engine.cpp b/source/adios2/core/Engine.cpp index ab9c1706d8..bb0ea33012 100644 --- a/source/adios2/core/Engine.cpp +++ b/source/adios2/core/Engine.cpp @@ -29,6 +29,15 @@ Engine::Engine(const std::string engineType, IO &io, const std::string &name, m_FailVerbose = (m_Comm.Rank() == 0); } +Engine::Engine(const std::string engineType, IO &io, const std::string &name, + const Mode openMode, helper::Comm comm, const char *md, + const size_t mdsize) +: m_EngineType(engineType), m_IO(io), m_Name(name), m_OpenMode(openMode), + m_Comm(std::move(comm)) +{ + ThrowUp("Engine with metadata in memory"); +} + Engine::~Engine() { if (m_IsOpen) @@ -43,6 +52,13 @@ IO &Engine::GetIO() noexcept { return m_IO; } Mode Engine::OpenMode() const noexcept { return m_OpenMode; } +void Engine::GetMetadata(char **md, size_t *size) +{ + ThrowUp("GetMetadata"); + *md = nullptr; + *size = 0; +} + StepStatus Engine::BeginStep() { if (m_OpenMode == Mode::Read) diff --git a/source/adios2/core/Engine.h b/source/adios2/core/Engine.h index f91e264b68..635e088260 100644 --- a/source/adios2/core/Engine.h +++ b/source/adios2/core/Engine.h @@ -71,6 +71,19 @@ class Engine Engine(const std::string engineType, IO &io, const std::string &name, const Mode mode, helper::Comm comm); + /** + * Unique Base class constructor + * @param engineType derived class identifier + * @param io object that generates this Engine + * @param name unique engine name within IO class object + * @param mode open mode from ADIOSTypes.h Mode + * @param comm communicator passed at Open or from ADIOS class + * @param md Metadata already in memory + */ + Engine(const std::string engineType, IO &io, const std::string &name, + const Mode mode, helper::Comm comm, const char *md, + const size_t mdsize); + virtual ~Engine(); explicit operator bool() const noexcept; @@ -87,6 +100,14 @@ class Engine */ Mode OpenMode() const noexcept; + /** Serialize all metadata right after engine is created, which can be + * delivered to other processes to open the same file for reading without + * opening and reading in metadata again. + * @return metadata (pointer to allocated memory) and size of metadata + * the pointer must be deallocated by user using free() + */ + virtual void GetMetadata(char **md, size_t *size); + StepStatus BeginStep(); /** diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index 47d7cd0f88..50acff2b13 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -37,6 +37,7 @@ #include "adios2/engine/skeleton/SkeletonWriter.h" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosCommDummy.h" #include "adios2/helper/adiosFunctions.h" //BuildParametersMap #include "adios2/helper/adiosString.h" #include // FileIsDirectory() @@ -72,7 +73,8 @@ std::unordered_map Factory = { {IO::MakeEngine, IO::MakeEngine}}, {"bp5", #ifdef ADIOS2_HAVE_BP5 - {IO::MakeEngine, IO::MakeEngine} + {IO::MakeEngine, IO::MakeEngine, + IO::MakeEngineWithMD} #else IO::NoEngineEntry("ERROR: this version didn't compile with " "BP5 library, can't use BP5 engine\n") @@ -516,7 +518,8 @@ void IO::AddOperation(const std::string &variable, m_VarOpsPlaceholder[variable].push_back({operatorType, parameters}); } -Engine &IO::Open(const std::string &name, const Mode mode, helper::Comm comm) +Engine &IO::Open(const std::string &name, const Mode mode, helper::Comm comm, + const char *md, const size_t mdsize) { PERFSTUBS_SCOPED_TIMER("IO::Open"); auto itEngineFound = m_Engines.find(name); @@ -684,8 +687,13 @@ Engine &IO::Open(const std::string &name, const Mode mode, helper::Comm comm) auto f = FactoryLookup(engineTypeLC); if (f != Factory.end()) { - if ((mode_to_use == Mode::Read) || - (mode_to_use == Mode::ReadRandomAccess)) + if (md && mode_to_use == Mode::ReadRandomAccess) + { + engine = f->second.MakeReaderWithMD(*this, name, mode_to_use, + std::move(comm), md, mdsize); + } + else if ((mode_to_use == Mode::Read) || + (mode_to_use == Mode::ReadRandomAccess)) { engine = f->second.MakeReader(*this, name, mode_to_use, std::move(comm)); @@ -718,6 +726,15 @@ Engine &IO::Open(const std::string &name, const Mode mode) { return Open(name, mode, m_ADIOS.GetComm().Duplicate()); } + +Engine &IO::Open(const std::string &name, const char *md, const size_t mdsize) +{ + const Mode mode = Mode::ReadRandomAccess; + // helper::Comm comm; + // std::cout << "Open comm rank = " << comm.Rank(); + return Open(name, mode, helper::CommDummy(), md, mdsize); +} + Group &IO::CreateGroup(char delimiter) { m_Gr = std::make_shared("", delimiter, *this); diff --git a/source/adios2/core/IO.h b/source/adios2/core/IO.h index 8a0acac023..3db93c421d 100644 --- a/source/adios2/core/IO.h +++ b/source/adios2/core/IO.h @@ -395,7 +395,8 @@ class IO * @exception std::invalid_argument if Engine with unique name is already * created with another Open */ - Engine &Open(const std::string &name, const Mode mode, helper::Comm comm); + Engine &Open(const std::string &name, const Mode mode, helper::Comm comm, + const char *md = nullptr, const size_t mdsize = 0); /** * Overloaded version that reuses the MPI_Comm object passed @@ -409,6 +410,21 @@ class IO */ Engine &Open(const std::string &name, const Mode mode); + /** + * Overloaded version that is specifically for a serial program + * opening a file (not stream) with ReadRandomAccess mode and + * supplying the metadata already in memory. The metadata + * should be retrieved by another program calling engine.GetMetadata() + * after opening the file. + * @param name unique engine identifier within IO object + * (file name in case of File transports) + * @param md file metadata residing in memory + * @return a reference to a derived object of the Engine class + * @exception std::invalid_argument if Engine with unique name is already + * created with another Open + */ + Engine &Open(const std::string &name, const char *md, const size_t mdsize); + /** * Retrieve an engine by name */ @@ -455,10 +471,14 @@ class IO using MakeEngineFunc = std::function( IO &, const std::string &, const Mode, helper::Comm)>; + using MakeEngineWithMDFunc = std::function( + IO &, const std::string &, const Mode, helper::Comm, const char *, + const size_t)>; struct EngineFactoryEntry { MakeEngineFunc MakeReader; MakeEngineFunc MakeWriter; + MakeEngineWithMDFunc MakeReaderWithMD; }; /** @@ -487,6 +507,18 @@ class IO return std::make_shared(io, name, mode, std::move(comm)); } + /** + * Create an engine of type T. This is intended to be used when + * creating instances of EngineFactoryEntry for RegisterEngine. + */ + template + static std::shared_ptr + MakeEngineWithMD(IO &io, const std::string &name, const Mode mode, + helper::Comm comm, const char *md, const size_t mdsize) + { + return std::make_shared(io, name, mode, std::move(comm), md, mdsize); + } + /** * Register an engine factory entry to create a reader or writer * for an engine of the given engine type (named in lower case). diff --git a/source/adios2/engine/bp4/BP4Reader.cpp b/source/adios2/engine/bp4/BP4Reader.cpp index f4e1f5c939..0ff729ba19 100644 --- a/source/adios2/engine/bp4/BP4Reader.cpp +++ b/source/adios2/engine/bp4/BP4Reader.cpp @@ -15,6 +15,7 @@ #include #include +#include namespace adios2 { @@ -38,6 +39,20 @@ BP4Reader::BP4Reader(IO &io, const std::string &name, const Mode mode, m_IsOpen = true; } +BP4Reader::BP4Reader(IO &io, const std::string &name, const Mode mode, + helper::Comm comm, const char *md, const size_t mdsize) +: Engine("BP4Reader", io, name, mode, std::move(comm)), + m_BP4Deserializer(m_Comm), m_MDFileManager(io, m_Comm), + m_DataFileManager(io, m_Comm), m_MDIndexFileManager(io, m_Comm), + m_ActiveFlagFileManager(io, m_Comm) +{ + PERFSTUBS_SCOPED_TIMER("BP4Reader::Open"); + readMetadataFromFile = false; + Init(); + // ProcessMetadataFromMemory(md); + m_IsOpen = true; +} + BP4Reader::~BP4Reader() { if (m_IsOpen) @@ -47,6 +62,23 @@ BP4Reader::~BP4Reader() m_IsOpen = false; } +void BP4Reader::GetMetadata(char **md, size_t *size) +{ + uint64_t sizes[2] = {m_BP4Deserializer.m_Metadata.m_Buffer.size(), + m_BP4Deserializer.m_MetadataIndex.m_Buffer.size()}; + + size_t mdsize = sizes[0] + sizes[1] + 2 * sizeof(uint64_t); + *md = (char *)malloc(mdsize); + *size = mdsize; + char *p = *md; + memcpy(p, sizes, sizeof(sizes)); + p += sizeof(sizes); + memcpy(p, m_BP4Deserializer.m_Metadata.m_Buffer.data(), sizes[0]); + p += sizes[0]; + memcpy(p, m_BP4Deserializer.m_MetadataIndex.m_Buffer.data(), sizes[1]); + p += sizes[1]; +} + StepStatus BP4Reader::BeginStep(StepMode mode, const float timeoutSeconds) { PERFSTUBS_SCOPED_TIMER("BP4Reader::BeginStep"); @@ -195,26 +227,28 @@ void BP4Reader::Init() InitTransports(); helper::RaiseLimitNoFile(); - - /* Do a collective wait for the file(s) to appear within timeout. - Make sure every process comes to the same conclusion */ - const Seconds timeoutSeconds( - m_BP4Deserializer.m_Parameters.OpenTimeoutSecs); - - Seconds pollSeconds( - m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs); - if (pollSeconds > timeoutSeconds) + if (readMetadataFromFile) { - pollSeconds = timeoutSeconds; - } + /* Do a collective wait for the file(s) to appear within timeout. + Make sure every process comes to the same conclusion */ + const Seconds timeoutSeconds( + m_BP4Deserializer.m_Parameters.OpenTimeoutSecs); + + Seconds pollSeconds( + m_BP4Deserializer.m_Parameters.BeginStepPollingFrequencySecs); + if (pollSeconds > timeoutSeconds) + { + pollSeconds = timeoutSeconds; + } - TimePoint timeoutInstant = Now() + timeoutSeconds; + TimePoint timeoutInstant = Now() + timeoutSeconds; - OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); - if (!m_BP4Deserializer.m_Parameters.StreamReader) - { - /* non-stream reader gets as much steps as available now */ - InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); + if (!m_BP4Deserializer.m_Parameters.StreamReader) + { + /* non-stream reader gets as much steps as available now */ + InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + } } } @@ -561,6 +595,41 @@ void BP4Reader::InitBuffer(const TimePoint &timeoutInstant, } } +void BP4Reader::ProcessMetadataFromMemory(const char *md) +{ + uint64_t size_mdidx, size_md; + const char *p = md; + memcpy(&size_md, p, sizeof(uint64_t)); + p = p + sizeof(uint64_t); + memcpy(&size_mdidx, p, sizeof(uint64_t)); + p = p + sizeof(uint64_t); + + std::string hint("when processing metadata from memory"); + size_t pos = 0; + + m_BP4Deserializer.m_Metadata.Resize(size_md, hint); + helper::CopyToBuffer(m_BP4Deserializer.m_Metadata.m_Buffer, pos, p, + size_md); + p = p + size_md; + + pos = 0; + m_BP4Deserializer.m_MetadataIndex.Resize(size_mdidx, hint); + helper::CopyToBuffer(m_BP4Deserializer.m_MetadataIndex.m_Buffer, pos, p, + size_mdidx); + p = p + size_mdidx; + + /* Parse metadata index table */ + m_BP4Deserializer.ParseMetadataIndex(m_BP4Deserializer.m_MetadataIndex, 0, + true, false); + // now we are sure the index header has been parsed, first step parsing + // done + m_IdxHeaderParsed = true; + + // fills IO with Variables and Attributes + m_MDFileProcessedSize = m_BP4Deserializer.ParseMetadata( + m_BP4Deserializer.m_Metadata, *this, true); +} + size_t BP4Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds) { diff --git a/source/adios2/engine/bp4/BP4Reader.h b/source/adios2/engine/bp4/BP4Reader.h index 0d89668168..39858e07cc 100644 --- a/source/adios2/engine/bp4/BP4Reader.h +++ b/source/adios2/engine/bp4/BP4Reader.h @@ -39,8 +39,13 @@ class BP4Reader : public Engine BP4Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm); + BP4Reader(IO &io, const std::string &name, const Mode mode, + helper::Comm comm, const char *md, const size_t mdsize); + virtual ~BP4Reader(); + void GetMetadata(char **md, size_t *size) final; + StepStatus BeginStep(StepMode mode = StepMode::Read, const float timeoutSeconds = -1.0) final; @@ -89,8 +94,11 @@ class BP4Reader : public Engine int m_Verbosity = 0; + bool readMetadataFromFile = true; + void Init(); void InitTransports(); + void ProcessMetadataFromMemory(const char *md); /* Sleep up to pollSeconds time if we have not reached timeoutInstant. * Return true if slept diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 1e8b6abf00..2afd0b85ce 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -40,6 +40,20 @@ BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode, m_IsOpen = true; } +BP5Reader::BP5Reader(IO &io, const std::string &name, const Mode mode, + helper::Comm comm, const char *md, const size_t mdsize) +: Engine("BP5Reader", io, name, mode, std::move(comm)), + m_MDFileManager(io, m_Comm), m_DataFileManager(io, m_Comm), + m_MDIndexFileManager(io, m_Comm), m_FileMetaMetadataManager(io, m_Comm), + m_ActiveFlagFileManager(io, m_Comm) +{ + PERFSTUBS_SCOPED_TIMER("BP5Reader::Open"); + readMetadataFromFile = false; + Init(); + ProcessMetadataFromMemory(md); + m_IsOpen = true; +} + BP5Reader::~BP5Reader() { if (m_BP5Deserializer) @@ -58,6 +72,99 @@ void BP5Reader::DestructorClose(bool Verbose) noexcept m_IsOpen = false; } +void BP5Reader::GetMetadata(char **md, size_t *size) +{ + uint64_t sizes[3] = {m_Metadata.m_Buffer.size(), + m_MetaMetadata.m_Buffer.size(), + m_MetadataIndex.m_Buffer.size()}; + + /* BP5 modifies the metadata block in memory during processing + so we have to read it from file again + */ + auto currentPos = m_MDFileManager.CurrentPos(0); + std::vector mdbuf(sizes[0]); + m_MDFileManager.ReadFile(mdbuf.data(), sizes[0], 0); + m_MDFileManager.SeekTo(currentPos, 0); + + size_t mdsize = sizes[0] + sizes[1] + sizes[2] + 3 * sizeof(uint64_t); + *md = (char *)malloc(mdsize); + *size = mdsize; + char *p = *md; + memcpy(p, sizes, sizeof(sizes)); + p += sizeof(sizes); + memcpy(p, mdbuf.data(), sizes[0]); + p += sizes[0]; + memcpy(p, m_MetaMetadata.m_Buffer.data(), sizes[1]); + p += sizes[1]; + memcpy(p, m_MetadataIndex.m_Buffer.data(), sizes[2]); + p += sizes[2]; +} + +void BP5Reader::ProcessMetadataFromMemory(const char *md) +{ + uint64_t size_mdidx, size_md, size_mmd; + const char *p = md; + memcpy(&size_md, p, sizeof(uint64_t)); + p = p + sizeof(uint64_t); + memcpy(&size_mmd, p, sizeof(uint64_t)); + p = p + sizeof(uint64_t); + memcpy(&size_mdidx, p, sizeof(uint64_t)); + p = p + sizeof(uint64_t); + + std::string hint("when processing metadata from memory"); + size_t pos = 0; + + m_Metadata.Resize(size_md, hint); + helper::CopyToBuffer(m_Metadata.m_Buffer, pos, p, size_md); + p = p + size_md; + + pos = 0; + m_MetaMetadata.Resize(size_mmd, hint); + helper::CopyToBuffer(m_MetaMetadata.m_Buffer, pos, p, size_mmd); + p = p + size_mmd; + + pos = 0; + m_MetadataIndex.Resize(size_mdidx, hint); + helper::CopyToBuffer(m_MetadataIndex.m_Buffer, pos, p, size_mdidx); + p = p + size_mdidx; + + size_t parsedIdxSize = 0; + const auto stepsBefore = m_StepsCount; + + parsedIdxSize = ParseMetadataIndex(m_MetadataIndex, 0, true); + + // cut down the index buffer by throwing away the read but unprocessed + // steps + m_MetadataIndex.m_Buffer.resize(parsedIdxSize); + // next time read index file from this position + m_MDIndexFileAlreadyReadSize += parsedIdxSize; + + // At this point first in time we learned the writer's major and we can + // create the serializer object + if (!m_BP5Deserializer) + { + m_BP5Deserializer = + new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor, + (m_OpenMode == Mode::ReadRandomAccess)); + m_BP5Deserializer->m_Engine = this; + } + + if (m_StepsCount > stepsBefore) + { + InstallMetaMetaData(m_MetaMetadata); + + if (m_OpenMode == Mode::ReadRandomAccess) + { + for (size_t Step = 0; Step < m_MetadataIndexTable.size(); Step++) + { + m_BP5Deserializer->SetupForStep( + Step, m_WriterMap[m_WriterMapIndex[Step]].WriterCount); + InstallMetadataForTimestep(Step); + } + } + } +} + void BP5Reader::InstallMetadataForTimestep(size_t Step) { size_t pgstart = m_MetadataIndexTable[Step][0]; @@ -455,19 +562,23 @@ void BP5Reader::Init() m_SelectedSteps.ParseSelection(m_Parameters.SelectSteps); } - /* Do a collective wait for the file(s) to appear within timeout. - Make sure every process comes to the same conclusion */ - const Seconds timeoutSeconds = Seconds(m_Parameters.OpenTimeoutSecs); - - Seconds pollSeconds = Seconds(m_Parameters.BeginStepPollingFrequencySecs); - if (pollSeconds > timeoutSeconds) + if (readMetadataFromFile) { - pollSeconds = timeoutSeconds; - } + /* Do a collective wait for the file(s) to appear within timeout. + Make sure every process comes to the same conclusion */ + const Seconds timeoutSeconds = Seconds(m_Parameters.OpenTimeoutSecs); + + Seconds pollSeconds = + Seconds(m_Parameters.BeginStepPollingFrequencySecs); + if (pollSeconds > timeoutSeconds) + { + pollSeconds = timeoutSeconds; + } - TimePoint timeoutInstant = Now() + timeoutSeconds; - OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); - UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + TimePoint timeoutInstant = Now() + timeoutSeconds; + OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); + UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + } } void BP5Reader::InitParameters() diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index d159ce83d5..45d803f208 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -45,8 +45,13 @@ class BP5Reader : public BP5Engine, public Engine BP5Reader(IO &io, const std::string &name, const Mode mode, helper::Comm comm); + BP5Reader(IO &io, const std::string &name, const Mode mode, + helper::Comm comm, const char *md, const size_t mdsize); + ~BP5Reader(); + void GetMetadata(char **md, size_t *size) final; + StepStatus BeginStep(StepMode mode = StepMode::Read, const float timeoutSeconds = -1.0) final; @@ -114,9 +119,12 @@ class BP5Reader : public BP5Engine, public Engine Minifooter m_Minifooter; + bool readMetadataFromFile = true; + void Init(); void InitParameters(); void InitTransports(); + void ProcessMetadataFromMemory(const char *md); /* Sleep up to pollSeconds time if we have not reached timeoutInstant. * Return true if slept diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index 4511155355..524ebfe216 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -150,6 +150,8 @@ class Transport virtual void Seek(const size_t start = MaxSizeT) = 0; + virtual size_t CurrentPos() = 0; + virtual void Truncate(const size_t length) = 0; virtual void MkDir(const std::string &fileName) = 0; diff --git a/source/adios2/toolkit/transport/file/FileAWSSDK.h b/source/adios2/toolkit/transport/file/FileAWSSDK.h index 552251738d..96f38be8c3 100644 --- a/source/adios2/toolkit/transport/file/FileAWSSDK.h +++ b/source/adios2/toolkit/transport/file/FileAWSSDK.h @@ -76,6 +76,8 @@ class FileAWSSDK : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final { return m_SeekPos; }; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/file/FileDaos.h b/source/adios2/toolkit/transport/file/FileDaos.h index 411fb55def..f78592a6c1 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.h +++ b/source/adios2/toolkit/transport/file/FileDaos.h @@ -58,6 +58,8 @@ class FileDaos : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final { return m_GlobalOffset; }; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index aa64cf1115..ad342ec032 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -361,6 +361,18 @@ void FileFStream::Seek(const size_t start) } } +size_t FileFStream::CurrentPos() +{ + if (m_OpenMode == Mode::Write || m_OpenMode == Mode::Append) + { + return static_cast(m_FileStream.tellp()); + } + else + { + return static_cast(m_FileStream.tellg()); + } +} + void FileFStream::Truncate(const size_t length) { #if __cplusplus >= 201703L diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index 3f2e022474..22009c8136 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -60,6 +60,8 @@ class FileFStream : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/file/FileIME.cpp b/source/adios2/toolkit/transport/file/FileIME.cpp index 5d91b67eca..976bf2a681 100644 --- a/source/adios2/toolkit/transport/file/FileIME.cpp +++ b/source/adios2/toolkit/transport/file/FileIME.cpp @@ -354,6 +354,12 @@ void FileIME::Seek(const size_t start) } } +size_t FileIME::CurrentPos() +{ + return static_cast( + ime_client_native2_lseek(m_FileDescriptor, 0, SEEK_CUR)); +} + void FileIME::Truncate(const size_t length) { helper::Throw( diff --git a/source/adios2/toolkit/transport/file/FileIME.h b/source/adios2/toolkit/transport/file/FileIME.h index 92532714c7..0ebc98f13a 100644 --- a/source/adios2/toolkit/transport/file/FileIME.h +++ b/source/adios2/toolkit/transport/file/FileIME.h @@ -59,6 +59,8 @@ class FileIME : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index bf41198508..a845e07df3 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -600,6 +600,11 @@ void FilePOSIX::Seek(const size_t start) } } +size_t FilePOSIX::CurrentPos() +{ + return static_cast(lseek(m_FileDescriptor, 0, SEEK_CUR)); +} + void FilePOSIX::Truncate(const size_t length) { WaitForOpen(); diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 4f352b833c..774e699277 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -66,6 +66,8 @@ class FilePOSIX : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index 24fab067fa..b7ee29593e 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -452,6 +452,11 @@ void FileStdio::Seek(const size_t start) } } +size_t FileStdio::CurrentPos() +{ + return static_cast(std::ftell(m_File)); +} + #ifdef _WIN32 void FileStdio::Truncate(const size_t length) { diff --git a/source/adios2/toolkit/transport/file/FileStdio.h b/source/adios2/toolkit/transport/file/FileStdio.h index 7882004626..13e71de161 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.h +++ b/source/adios2/toolkit/transport/file/FileStdio.h @@ -62,6 +62,8 @@ class FileStdio : public Transport void Seek(const size_t start) final; + size_t CurrentPos() final; + void Truncate(const size_t length) final; void MkDir(const std::string &fileName) final; diff --git a/source/adios2/toolkit/transport/null/NullTransport.h b/source/adios2/toolkit/transport/null/NullTransport.h index 1595ce793a..fa85b2710e 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.h +++ b/source/adios2/toolkit/transport/null/NullTransport.h @@ -58,6 +58,8 @@ class NullTransport : public Transport void Seek(const size_t start = MaxSizeT) override; + size_t CurrentPos() override { return 0; }; + void Truncate(const size_t length) override; protected: diff --git a/source/adios2/toolkit/transport/shm/ShmSystemV.h b/source/adios2/toolkit/transport/shm/ShmSystemV.h index fb8838448e..eb4c1d5020 100644 --- a/source/adios2/toolkit/transport/shm/ShmSystemV.h +++ b/source/adios2/toolkit/transport/shm/ShmSystemV.h @@ -53,6 +53,8 @@ class ShmSystemV : public Transport void Seek(const size_t start = MaxSizeT) final; + size_t CurrentPos() final { return 0; }; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index c2e21b6aec..c2f41001ea 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -384,6 +384,14 @@ void TransportMan::SeekTo(const size_t start, const int transportIndex) } } +size_t TransportMan::CurrentPos(const int transportIndex) +{ + auto itTransport = m_Transports.find(transportIndex); + CheckFile(itTransport, ", in call to CurrentPos with index " + + std::to_string(transportIndex)); + return itTransport->second->CurrentPos(); +} + void TransportMan::Truncate(const size_t length, const int transportIndex) { if (transportIndex == -1) diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index 32ae4b1343..4b46b98381 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -207,6 +207,8 @@ class TransportMan void SeekTo(const size_t start, const int transportIndex = -1); + size_t CurrentPos(const int transportIndex); + void Truncate(const size_t length, const int transportIndex = -1); /** diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 5237281f43..18ac338fb1 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -180,6 +180,7 @@ gtest_add_tests_helper(WriteNull MPI_ALLOW BP Engine.BP. .BP3 # BP4 and BP5 but NOT BP3 bp4_bp5_gtest_add_tests_helper(WriteAppendReadADIOS2 MPI_ALLOW) +bp4_bp5_gtest_add_tests_helper(OpenWithMetadata MPI_NONE) # BP4 only for now # gtest_add_tests_helper(WriteAppendReadADIOS2 MPI_ALLOW BP Engine.BP. .BP4 diff --git a/testing/adios2/engine/bp/TestBPOpenWithMetadata.cpp b/testing/adios2/engine/bp/TestBPOpenWithMetadata.cpp new file mode 100644 index 0000000000..31f227b80b --- /dev/null +++ b/testing/adios2/engine/bp/TestBPOpenWithMetadata.cpp @@ -0,0 +1,203 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include +#include + +#include +#include //std::iota +#include + +#include + +#include + +#include "../SmallTestData.h" + +std::string engineName; // comes from command line +std::string engineParameters; // comes from command line + +class BPOpenWithMetadata : public ::testing::Test +{ +public: + BPOpenWithMetadata() = default; + + SmallTestData m_TestData; +}; + +//****************************************************************************** +// Create an output +// Open normally +// Get metadata +// Open again with metadata +//****************************************************************************** + +// ADIOS2 BP write and read 1D arrays +TEST_F(BPOpenWithMetadata, ADIOS2BPOpenWithMetadata) +{ + const std::string fname("ADIOS2BPOpenWithMetadata.bp"); + const size_t Nx = 6; + const size_t NSteps = 3; + + adios2::ADIOS adios; + { + adios2::IO io = adios.DeclareIO("TestIO"); + const adios2::Dims shape{Nx}; + const adios2::Dims start{0}; + const adios2::Dims count{Nx}; + auto v = io.DefineVariable("r64", shape, start, count); + + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + else + { + // Create the BP Engine + io.SetEngine("BPFile"); + } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } + + adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); + EXPECT_EQ(bpWriter.OpenMode(), adios2::Mode::Write); + for (size_t step = 0; step < NSteps; ++step) + { + // Generate test data for each process uniquely + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), 0, 1); + + bpWriter.BeginStep(); + bpWriter.Put(v, currentTestData.R64.data()); + bpWriter.EndStep(); + } + bpWriter.Close(); + } + + char *md; + size_t mdsize; + + { + adios2::IO io = adios.DeclareIO("ReadIO"); + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } + + adios2::Engine bpReader = + io.Open(fname, adios2::Mode::ReadRandomAccess); + + bpReader.GetMetadata(&md, &mdsize); + + auto var_r64 = io.InquireVariable("r64"); + EXPECT_TRUE(var_r64); + ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::GlobalArray); + ASSERT_EQ(var_r64.Steps(), NSteps); + ASSERT_EQ(var_r64.Shape()[0], Nx); + + SmallTestData testData; + std::array R64; + + const adios2::Dims start{0}; + const adios2::Dims count{Nx}; + const adios2::Box sel(start, count); + + var_r64.SetSelection(sel); + + for (size_t t = 0; t < NSteps; ++t) + { + var_r64.SetStepSelection({t, 1}); + + // Generate test data for each rank uniquely + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, static_cast(t), 0, 1); + + bpReader.Get(var_r64, R64.data(), adios2::Mode::Sync); + for (size_t i = 0; i < Nx; ++i) + { + std::stringstream ss; + ss << "t=" << t << " i=" << i; + std::string msg = ss.str(); + EXPECT_EQ(R64[i], currentTestData.R64[i]) << msg; + } + } + bpReader.Close(); + } + + /* Open again with metadata */ + { + adios2::IO io = adios.DeclareIO("ReadIOMD"); + if (!engineName.empty()) + { + io.SetEngine(engineName); + } + if (!engineParameters.empty()) + { + io.SetParameters(engineParameters); + } + + adios2::Engine bpReader = io.Open(fname, md, mdsize); + + auto var_r64 = io.InquireVariable("r64"); + EXPECT_TRUE(var_r64); + ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::GlobalArray); + ASSERT_EQ(var_r64.Steps(), NSteps); + ASSERT_EQ(var_r64.Shape()[0], Nx); + + SmallTestData testData; + std::array R64; + + const adios2::Dims start{0}; + const adios2::Dims count{Nx}; + const adios2::Box sel(start, count); + + var_r64.SetSelection(sel); + + for (size_t t = 0; t < NSteps; ++t) + { + var_r64.SetStepSelection({t, 1}); + + // Generate test data for each rank uniquely + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, static_cast(t), 0, 1); + + bpReader.Get(var_r64, R64.data(), adios2::Mode::Sync); + for (size_t i = 0; i < Nx; ++i) + { + std::stringstream ss; + ss << "t=" << t << " i=" << i; + std::string msg = ss.str(); + EXPECT_EQ(R64[i], currentTestData.R64[i]) << msg; + } + } + bpReader.Close(); + } +} + +//****************************************************************************** +// main +//****************************************************************************** + +int main(int argc, char **argv) +{ + int result; + ::testing::InitGoogleTest(&argc, argv); + + if (argc > 1) + { + engineName = std::string(argv[1]); + } + if (argc > 2) + { + engineParameters = std::string(argv[2]); + } + result = RUN_ALL_TESTS(); + return result; +}