From 07423dc878a812477fc6db2cbefa38940543495a Mon Sep 17 00:00:00 2001 From: pnorbert Date: Fri, 30 Aug 2024 13:37:31 -0400 Subject: [PATCH] Fix an issue when reading blocks in streaming mode, when some producers did not write any block to output. (#4332) (cherry picked from commit 75d801f4a1ce5dbbebe5843b19f7e8df47978240) --- .../toolkit/format/bp5/BP5Deserializer.cpp | 2 +- .../bp/TestBPWriteReadLocalVariables.cpp | 194 ++++++++++++++++++ 2 files changed, 195 insertions(+), 1 deletion(-) diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index 22fef0463a..1e5ee4d16b 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -1580,7 +1580,7 @@ BP5Deserializer::GenerateReadRequests(const bool doAllocTempBuffers, size_t *max MetaArrayRecOperator *writer_meta_base = (MetaArrayRecOperator *)GetMetadataBase((struct BP5VarRec *)Req->VarRec, Step, WriterRank); - if (!writer_meta_base) + if (!writer_meta_base || !writer_meta_base->BlockCount) { continue; // Not writen on this step } diff --git a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp index 05c23ccbfc..3adc967110 100644 --- a/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp +++ b/testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp @@ -1798,6 +1798,200 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal2DChangeCount) } } +template +void print_vector(std::vector &v) +{ + std::cout << "{ "; + for (auto &e : v) + { + std::cout << e << " "; + } + std::cout << "}" << std::endl; +} + +template +void print_array(std::array &a) +{ + std::cout << "{ "; + for (auto &e : a) + { + std::cout << e << " "; + } + std::cout << "}" << std::endl; +} + +TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocalVaryingNumberOfBlocks) +{ + /* Write different number of blocks per step, skipping some rank in some steps. + Each block size is different too. Test reading each block properly. + Only the first 8 MPI ranks do work, the others just loop empty. + */ + constexpr int NSTEPS = 3; + constexpr int MAXMPISIZE = 8; + const int blocks[NSTEPS][MAXMPISIZE] = { + {1, 1, 1, 1, 1, 1, 1, 1}, + {1, 0, 1, 0, 1, 0, 1, 0}, + {0, 1, 0, 1, 0, 1, 0, 1}, + }; + const size_t blocksizes[NSTEPS][MAXMPISIZE] = { + {9, 8, 7, 6, 5, 6, 7, 3}, + {2, 0, 4, 0, 6, 0, 9, 0}, + {0, 1, 0, 4, 0, 7, 0, 5}, + }; + + int mpiRank = 0; + int mpiSize = 1; +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); + const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + "_MPI.bp"); +#else + const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + ".bp"); +#endif + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD); +#else + adios2::ADIOS adios; +#endif + { + adios2::IO io = adios.DeclareIO("TestIO"); + const adios2::Dims shape{}; + const adios2::Dims start{}; + const adios2::Dims count{(size_t)1}; + + auto var_r32 = io.DefineVariable("r32", shape, start, count); + + adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write); + + for (int step = 0; step < NSTEPS; ++step) + { + std::cout << "--- generateNewSmallTestData Step " << step << " rank " << mpiRank + << " nprocs " << mpiSize << " ---" << std::endl; + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, static_cast(step), mpiRank, mpiSize); + print_array(currentTestData.R32); + + int bpos = (mpiRank < MAXMPISIZE ? mpiRank : -1); + + bpWriter.BeginStep(); + if (bpos >= 0 && blocks[step][bpos]) + { + size_t nx = blocksizes[step][bpos]; + var_r32.SetSelection({{}, {nx}}); + bpWriter.Put(var_r32, currentTestData.R32.data()); + } + bpWriter.EndStep(); + } + bpWriter.Close(); + } +#if ADIOS2_USE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif + + if (!mpiRank) + { + adios2::IO io = adios.DeclareIO("ReaderIO"); +#if ADIOS2_USE_MPI + adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read, MPI_COMM_SELF); +#else + adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read); +#endif + + for (int step = 0; step < NSTEPS; ++step) + { + std::cout << "====== Read Step " << step << " ==========" << std::endl; + bpReader.BeginStep(); + auto var_r32 = io.InquireVariable("r32"); + + size_t nblocks = 0; + int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE); + for (size_t b = 0; b < static_cast(nproc); ++b) + { + if (blocks[step][b] == 0) + { + continue; + } + + std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank + << " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---" + << std::endl; + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), static_cast(b), mpiSize); + print_array(currentTestData.R32); + + var_r32.SetBlockSelection(nblocks); + std::vector dataIn; + bpReader.Get(var_r32, dataIn, adios2::Mode::Sync); + EXPECT_EQ(dataIn.size(), blocksizes[step][b]); + + for (size_t i = 0; i < dataIn.size(); ++i) + { + EXPECT_EQ(dataIn[i], currentTestData.R32[i]); + } + + ++nblocks; + } + bpReader.EndStep(); + } + bpReader.Close(); + } +#if ADIOS2_USE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif + + if (!mpiRank) + { + adios2::IO io = adios.DeclareIO("ReaderIORRA"); +#if ADIOS2_USE_MPI + adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess, MPI_COMM_SELF); +#else + adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess); +#endif + + auto var_r32 = io.InquireVariable("r32"); + + for (int step = 0; step < NSTEPS; ++step) + { + std::cout << "====== Read Step " << step << " ==========" << std::endl; + var_r32.SetStepSelection({step, 1}); + + size_t nblocks = 0; + int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE); + for (size_t b = 0; b < static_cast(nproc); ++b) + { + if (blocks[step][b] == 0) + { + continue; + } + + std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank + << " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---" + << std::endl; + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), static_cast(b), mpiSize); + print_array(currentTestData.R32); + + var_r32.SetBlockSelection(nblocks); + std::vector dataIn; + bpReader.Get(var_r32, dataIn, adios2::Mode::Sync); + EXPECT_EQ(dataIn.size(), blocksizes[step][b]); + + for (size_t i = 0; i < dataIn.size(); ++i) + { + EXPECT_EQ(dataIn[i], currentTestData.R32[i]); + } + + ++nblocks; + } + } + bpReader.Close(); + } +#if ADIOS2_USE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif +} + int main(int argc, char **argv) { #if ADIOS2_USE_MPI