diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index cf37c301a5..4c995087c1 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -113,8 +113,8 @@ class BP5Engine }; #define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \ - MACRO(OpenTimeoutSecs, Int, int, 3600) \ - MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \ + MACRO(OpenTimeoutSecs, Float, float, -1.0f) \ + MACRO(BeginStepPollingFrequencySecs, Float, float, 1.0f) \ MACRO(StreamReader, Bool, bool, false) \ MACRO(BurstBufferDrain, Bool, bool, true) \ MACRO(BurstBufferPath, String, std::string, (char *)(intptr_t)0) \ diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 736f0bf157..1a3819f63c 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -100,19 +100,21 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds) StepStatus status = StepStatus::OK; if (m_FirstStep) { - if (m_StepsCount == 0) + if (!m_StepsCount) { - // status = CheckForNewSteps(Seconds(timeoutSeconds)); + // not steps was found in Open/Init, check for new steps now + status = CheckForNewSteps(Seconds(timeoutSeconds)); } } else { if (m_CurrentStep + 1 >= m_StepsCount) { - // status = CheckForNewSteps(Seconds(timeoutSeconds)); - status = StepStatus::EndOfStream; + // we processed steps in memory, check for new steps now + status = CheckForNewSteps(Seconds(timeoutSeconds)); } } + if (status == StepStatus::OK) { m_BetweenStepPairs = true; @@ -142,6 +144,10 @@ StepStatus BP5Reader::BeginStep(StepMode mode, const float timeoutSeconds) m_CurrentStep, m_WriterMap[m_WriterMapIndex[m_CurrentStep]].WriterCount); + /* Remove all existing variables from previous steps + It seems easier than trying to update them */ + // m_IO.RemoveAllVariables(); + InstallMetadataForTimestep(m_CurrentStep); m_IO.ResetVariablesStepSelection(false, "in call to BP5 Reader BeginStep"); @@ -253,9 +259,8 @@ void BP5Reader::Init() // if IO was involved in reading before this flag may be true now m_IO.m_ReadStreaming = false; - - ParseParams(m_IO, m_Parameters); m_ReaderIsRowMajor = (m_IO.m_ArrayOrder == ArrayOrdering::RowMajor); + InitParameters(); InitTransports(); if (!m_Parameters.SelectSteps.empty()) { @@ -273,18 +278,31 @@ void BP5Reader::Init() } TimePoint timeoutInstant = Now() + timeoutSeconds; - OpenFiles(timeoutInstant, pollSeconds, timeoutSeconds); + UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); +} - /* non-stream reader gets as much steps as available now */ - InitBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); +void BP5Reader::InitParameters() +{ + ParseParams(m_IO, m_Parameters); + if (m_Parameters.OpenTimeoutSecs < 0.0f) + { + if (m_OpenMode == Mode::ReadRandomAccess) + { + m_Parameters.OpenTimeoutSecs = 0.0f; + } + else + { + m_Parameters.OpenTimeoutSecs = 3600.0f; + } + } } bool BP5Reader::SleepOrQuit(const TimePoint &timeoutInstant, const Seconds &pollSeconds) { auto now = Now(); - if (now + pollSeconds >= timeoutInstant) + if (now >= timeoutInstant) { return false; } @@ -484,7 +502,7 @@ uint64_t BP5Reader::MetadataExpectedMinFileSize(const std::string &IdxFileName, void BP5Reader::InstallMetaMetaData(format::BufferSTL buffer) { - size_t Position = 0; + size_t Position = m_MetaMetaDataFileAlreadyProcessedSize; while (Position < buffer.m_Buffer.size()) { format::BP5Base::MetaMetaInfoBlock MMI; @@ -498,59 +516,73 @@ void BP5Reader::InstallMetaMetaData(format::BufferSTL buffer) m_BP5Deserializer->InstallMetaMetaData(MMI); Position += MMI.MetaMetaIDLen + MMI.MetaMetaInfoLen; } + m_MetaMetaDataFileAlreadyProcessedSize = Position; } -void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, - const Seconds &pollSeconds, - const Seconds &timeoutSeconds) +void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, + const Seconds &pollSeconds, + const Seconds &timeoutSeconds) { - /* Put all metadata in buffer and parse in random access mode */ size_t newIdxSize = 0; + m_MetadataIndex.Reset(true, false); if (m_Comm.Rank() == 0) { /* Read metadata index table into memory */ const size_t metadataIndexFileSize = m_MDIndexFileManager.GetFileSize(0); - if (metadataIndexFileSize > 0) + newIdxSize = metadataIndexFileSize - m_MDIndexFileAlreadyReadSize; + if (metadataIndexFileSize > m_MDIndexFileAlreadyReadSize) { - m_MetadataIndex.Resize(metadataIndexFileSize, - "allocating metadata index buffer, " - "in call to BPFileReader Open"); + m_MetadataIndex.m_Buffer.resize(newIdxSize); m_MDIndexFileManager.ReadFile(m_MetadataIndex.m_Buffer.data(), - metadataIndexFileSize); + newIdxSize, + m_MDIndexFileAlreadyReadSize); + } + else + { + m_MetadataIndex.m_Buffer.resize(0); } - m_MDIndexFileAlreadyReadSize = metadataIndexFileSize; - newIdxSize = metadataIndexFileSize; } - newIdxSize = m_Comm.BroadcastValue(newIdxSize, 0); + // broadcast metadata index buffer to all ranks from zero + m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer); + newIdxSize = m_MetadataIndex.m_Buffer.size(); + size_t parsedIdxSize = 0; + const auto stepsBefore = m_StepsCount; if (newIdxSize > 0) { - // broadcast metadata index buffer to all ranks from zero - m_Comm.BroadcastVector(m_MetadataIndex.m_Buffer); - /* Parse metadata index table */ - ParseMetadataIndex(m_MetadataIndex, 0, true, false); + const bool hasHeader = (!m_MDIndexFileAlreadyReadSize); + parsedIdxSize = ParseMetadataIndex(m_MetadataIndex, 0, hasHeader); // now we are sure the index header has been parsed, // first step parsing done // m_FilteredMetadataInfo is created - m_IdxHeaderParsed = true; + + // cut down the index buffer by throwing away the read but unprocessed + // steps + m_MetadataIndex.m_Buffer.resize(parsedIdxSize); + // next time read index file from this position + m_MDIndexFileAlreadyReadSize += parsedIdxSize; + + // At this point first in time we learned the writer's major and we can + // create the serializer object + if (!m_BP5Deserializer) + { + m_BP5Deserializer = new format::BP5Deserializer( + m_WriterIsRowMajor, m_ReaderIsRowMajor, + (m_OpenMode == Mode::ReadRandomAccess)); + m_BP5Deserializer->m_Engine = this; + } } - if (newIdxSize > 0) + if (m_StepsCount > stepsBefore) { + m_Metadata.Reset(true, false); + m_MetaMetadata.Reset(true, false); if (m_Comm.Rank() == 0) { - /* Read metametadata into memory */ - const size_t metametadataFileSize = - m_FileMetaMetadataManager.GetFileSize(0); - m_MetaMetadata.Resize(metametadataFileSize, - "allocating metadata index buffer, " - "in call to BPFileReader Open"); - m_FileMetaMetadataManager.ReadFile(m_MetaMetadata.m_Buffer.data(), - metametadataFileSize); - + // How much metadata do we need to read? size_t fileFilteredSize = 0; for (auto p : m_FilteredMetadataInfo) { @@ -576,13 +608,9 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, m_Metadata.Resize(fileFilteredSize, "allocating metadata buffer, " "in call to BP5Reader Open"); - size_t mempos = 0; for (auto p : m_FilteredMetadataInfo) { - /*std::cout << "Read metadata pos = " << p.first - << " size = " << p.second - << " to mempos = " << mempos << std::endl;*/ m_MDFileManager.ReadFile( m_Metadata.m_Buffer.data() + mempos, p.second, p.first); mempos += p.second; @@ -592,7 +620,7 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, else { helper::Throw( - "Engine", "BP5Reader", "InitBuffer", + "Engine", "BP5Reader", "UpdateBuffer", "File " + m_Name + " was found with an index file but md.0 " "has not contained enough data within " @@ -602,10 +630,29 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, " metadata size = " + std::to_string(actualFileSize) + " expected size = " + std::to_string(expectedMinFileSize) + - ". One reason could be if the reader finds old data " + ". One reason could be if the reader finds old " + "data " "while " "the writer is creating the new files."); } + + /* Read new meta-meta-data into memory and append to existing one in + * memory */ + const size_t metametadataFileSize = + m_FileMetaMetadataManager.GetFileSize(0); + if (metametadataFileSize > m_MetaMetaDataFileAlreadyReadSize) + { + const size_t newMMDSize = + metametadataFileSize - m_MetaMetaDataFileAlreadyReadSize; + m_MetaMetadata.Resize(metametadataFileSize, + "(re)allocating meta-meta-data buffer, " + "in call to BP5Reader Open"); + m_FileMetaMetadataManager.ReadFile( + m_MetaMetadata.m_Buffer.data() + + m_MetaMetaDataFileAlreadyReadSize, + newMMDSize, m_MetaMetaDataFileAlreadyReadSize); + m_MetaMetaDataFileAlreadyReadSize += newMMDSize; + } } // broadcast buffer to all ranks from zero @@ -614,11 +661,6 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, // broadcast metadata index buffer to all ranks from zero m_Comm.BroadcastVector(m_MetaMetadata.m_Buffer); - m_BP5Deserializer = - new format::BP5Deserializer(m_WriterIsRowMajor, m_ReaderIsRowMajor, - (m_OpenMode == Mode::ReadRandomAccess)); - m_BP5Deserializer->m_Engine = this; - InstallMetaMetaData(m_MetaMetadata); if (m_OpenMode == Mode::ReadRandomAccess) @@ -630,26 +672,12 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, InstallMetadataForTimestep(Step); } } - // fills IO with Variables and Attributes - // m_MDFileProcessedSize = ParseMetadata( - // m_Metadata, *this, true); - - /* m_MDFileProcessedSize is the position in the buffer where processing - * ends. The processing is controlled by the number of records in the - * Index, which may be less than the actual entries in the metadata in a - * streaming situation (where writer has just written metadata for step - * K+1,...,K+L while the index contains K steps when the reader looks at - * it). - * - * In ProcessMetadataForNewSteps(), we will re-read the metadata which - * is in the buffer but has not been processed yet. - */ } } -void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, - const size_t absoluteStartPos, - const bool hasHeader, const bool oneStepOnly) +size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, + const size_t absoluteStartPos, + const bool hasHeader) { const auto &buffer = bufferSTL.m_Buffer; size_t &position = bufferSTL.m_Position; @@ -709,15 +737,23 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, position = m_IndexHeaderSize; } + // set a limit for metadata size in streaming mode + size_t maxMetadataSizeInMemory = adios2::MaxSizeT; + if (m_OpenMode == Mode::Read) + { + maxMetadataSizeInMemory = 16777216; // 16MB + } + size_t metadataSizeToRead = 0; + // Read each record now - uint64_t absStepInFile = 0; - uint64_t lastMapStep = 0; - uint64_t lastWriterCount = 0; uint64_t MetadataPosTotalSkip = 0; + m_MetadataIndexTable.clear(); m_FilteredMetadataInfo.clear(); uint64_t minfo_pos = 0; uint64_t minfo_size = 0; - do + int n = 0; // a loop counter for current run + while (position < buffer.size() && + metadataSizeToRead < maxMetadataSizeInMemory) { std::vector ptrs; const uint64_t MetadataPos = helper::ReadValue( @@ -729,7 +765,7 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, const uint64_t hasWriterMap = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); - if (!absStepInFile) + if (!n) { minfo_pos = MetadataPos; // initialize minfo_pos properly MetadataPosTotalSkip = MetadataPos; @@ -753,20 +789,21 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, buffer, position, m_Minifooter.IsLittleEndian); s.RankToSubfile.push_back(subfileIdx); } - lastMapStep = m_StepsCount; - lastWriterCount = s.WriterCount; + m_LastMapStep = m_StepsCount; + m_LastWriterCount = s.WriterCount; } - if (m_SelectedSteps.IsSelected(absStepInFile)) + if (m_SelectedSteps.IsSelected(m_AbsStepsInFile)) { - m_WriterMapIndex.push_back(lastMapStep); + m_WriterMapIndex.push_back(m_LastMapStep); // pos in metadata in memory ptrs.push_back(MetadataPos - MetadataPosTotalSkip); ptrs.push_back(MetadataSize); ptrs.push_back(FlushCount); ptrs.push_back(position); - ptrs.push_back(MetadataPos); // absolute pos in file before read + // absolute pos in file before read + ptrs.push_back(MetadataPos); m_MetadataIndexTable[m_StepsCount] = ptrs; #ifdef DUMPDATALOCINFO for (uint64_t i = 0; i < m_WriterCount; i++) @@ -788,6 +825,7 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, } #endif minfo_size += MetadataSize; + metadataSizeToRead += MetadataSize; m_StepsCount++; } else @@ -803,13 +841,105 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, } // skip over the writer -> data file offset records - position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1); - absStepInFile++; - } while (!oneStepOnly && position < buffer.size()); + position += + sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1); + ++m_AbsStepsInFile; + ++n; + } if (minfo_size > 0) { m_FilteredMetadataInfo.push_back(std::make_pair(minfo_pos, minfo_size)); } + return position; +} + +bool BP5Reader::ReadActiveFlag(std::vector &buffer) +{ + if (buffer.size() < m_ActiveFlagPosition) + { + helper::Throw( + "Engine", "BP5Reader", "ReadActiveFlag", + "called with a buffer smaller than required"); + } + // Writer active flag + size_t position = m_ActiveFlagPosition; + const char activeChar = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + m_WriterIsActive = (activeChar == '\1' ? true : false); + return m_WriterIsActive; +} + +bool BP5Reader::CheckWriterActive() +{ + size_t flag = 0; + if (m_Comm.Rank() == 0) + { + std::vector header(m_IndexHeaderSize, '\0'); + m_MDIndexFileManager.ReadFile(header.data(), m_IndexHeaderSize, 0, 0); + bool active = ReadActiveFlag(header); + flag = (active ? 1 : 0); + } + flag = m_Comm.BroadcastValue(flag, 0); + m_WriterIsActive = (flag > 0); + return m_WriterIsActive; +} + +StepStatus BP5Reader::CheckForNewSteps(Seconds timeoutSeconds) +{ + /* Do a collective wait for a step within timeout. + Make sure every reader comes to the same conclusion */ + StepStatus retval = StepStatus::OK; + + if (timeoutSeconds < Seconds::zero()) + { + timeoutSeconds = Seconds(999999999); // max 1 billion seconds wait + } + const TimePoint timeoutInstant = Now() + timeoutSeconds; + + auto pollSeconds = Seconds(m_Parameters.BeginStepPollingFrequencySecs); + if (pollSeconds > timeoutSeconds) + { + pollSeconds = timeoutSeconds; + } + + /* Poll */ + const auto stepsBefore = m_StepsCount; + do + { + UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + if (m_StepsCount > stepsBefore) + { + break; + } + if (!CheckWriterActive()) + { + /* Race condition: When checking data in UpdateBuffer, new + * step(s) may have not arrived yet. When checking active flag, + * the writer may have completed write and terminated. So we may + * have missed a step or two. */ + UpdateBuffer(timeoutInstant, pollSeconds / 10, timeoutSeconds); + break; + } + } while (SleepOrQuit(timeoutInstant, pollSeconds)); + + if (m_StepsCount > stepsBefore) + { + /* we have got new steps and new metadata in memory */ + retval = StepStatus::OK; + } + else + { + m_IO.m_ReadStreaming = false; + if (m_WriterIsActive) + { + retval = StepStatus::NotReady; + } + else + { + retval = StepStatus::EndOfStream; + } + } + return retval; } void BP5Reader::DoGetAbsoluteSteps(const VariableBase &variable, @@ -860,11 +990,15 @@ void BP5Reader::NotifyEngineNoVarsQuery() helper::Throw( "Engine", "BP5Reader", "NotifyEngineNoVarsQuery", "You've called InquireVariable() when the IO is empty and " - "outside a BeginStep/EndStep pair. If this is code that is newly " - "transititioning to the BP5 file engine, you may be relying upon " + "outside a BeginStep/EndStep pair. If this is code that is " + "newly " + "transititioning to the BP5 file engine, you may be relying " + "upon " "deprecated behaviour. If you intend to use ADIOS using the " - "Begin/EndStep interface, move all InquireVariable calls inside " - "the BeginStep/EndStep pair. If intending to use random-access " + "Begin/EndStep interface, move all InquireVariable calls " + "inside " + "the BeginStep/EndStep pair. If intending to use " + "random-access " "file mode, change your Open() mode parameter to " "Mode::ReadRandomAccess."); } diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index 39a1c25e85..1e8eeb58ea 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -88,6 +88,11 @@ class BP5Reader : public BP5Engine, public Engine /* How many bytes of metadata index have we already read in? */ size_t m_MDIndexFileAlreadyReadSize = 0; + /* How many bytes of meta-metadata have we already read in? */ + size_t m_MetaMetaDataFileAlreadyReadSize = 0; + /* How many bytes of meta-metadata have we already processed? */ + size_t m_MetaMetaDataFileAlreadyProcessedSize = 0; + /* transport manager for managing the active flag file */ transportman::TransportMan m_ActiveFlagFileManager; bool m_WriterIsActive = true; @@ -95,8 +100,10 @@ class BP5Reader : public BP5Engine, public Engine /** used for per-step reads, TODO: to be moved to BP5Deserializer */ size_t m_CurrentStep = 0; size_t m_StepsCount = 0; + size_t m_AbsStepsInFile = 0; // all steps parsed including unselected + uint64_t m_LastMapStep = 0; // remember last step that had writer map + uint64_t m_LastWriterCount = 0; // remember writer count in that step bool m_FirstStep = true; - bool m_IdxHeaderParsed = false; // true after first index parsing /** used to filter steps */ helper::RangeFilter m_SelectedSteps; @@ -107,6 +114,7 @@ class BP5Reader : public BP5Engine, public Engine Minifooter m_Minifooter; void Init(); + void InitParameters(); void InitTransports(); /* Sleep up to pollSeconds time if we have not reached timeoutInstant. @@ -130,19 +138,34 @@ class BP5Reader : public BP5Engine, public Engine */ void OpenFiles(TimePoint &timeoutInstant, const Seconds &pollSeconds, const Seconds &timeoutSeconds); - void InitBuffer(const TimePoint &timeoutInstant, const Seconds &pollSeconds, - const Seconds &timeoutSeconds); - /** Read in more metadata if exist (throwing away old). - * For streaming only. - * @return size of new content from Index Table + /** Read in metadata if exist (throwing away old). + * It reads and parses metadata-index, and reads metadata into memory. + * In streaming mode, only a limited size of metadata is read in. + * Changes in m_StepsCount before and after calling can be used to + * track if new steps (after filtering with SelectSteps) are read in + * and are ready to be processed. + */ + void UpdateBuffer(const TimePoint &timeoutInstant, + const Seconds &pollSeconds, + const Seconds &timeoutSeconds); + + bool ReadActiveFlag(std::vector &buffer); + + /* Parse metadata. + * + * Return the size of metadataindex where parsing stopped. In streaming mode + * parsing is limited to read only a certain size of metadata at once. + * + * As a side effect, the following variables are filled out: + * m_MetadataIndexTable + * m_WriterMapIndex + * m_FilteredMetadataInfo */ - size_t UpdateBuffer(const TimePoint &timeoutInstant, - const Seconds &pollSeconds); + size_t ParseMetadataIndex(format::BufferSTL &bufferSTL, + const size_t absoluteStartPos, + const bool hasHeader); - void ParseMetadataIndex(format::BufferSTL &bufferSTL, - const size_t absoluteStartPos, const bool hasHeader, - const bool oneStepOnly); /** Process the new metadata coming in (in UpdateBuffer) * @param newIdxSize: the size of the new content from Index Table */ diff --git a/source/adios2/helper/adiosComm.inl b/source/adios2/helper/adiosComm.inl index df8b1f9a28..1106514ee4 100644 --- a/source/adios2/helper/adiosComm.inl +++ b/source/adios2/helper/adiosComm.inl @@ -163,7 +163,10 @@ void Comm::BroadcastVector(std::vector &vector, const int rankSource) const vector.resize(inputSize); } - this->Bcast(vector.data(), inputSize, rankSource); + if (inputSize > 0) + { + this->Bcast(vector.data(), inputSize, rankSource); + } } template diff --git a/source/adios2/helper/adiosRangeFilter.cpp b/source/adios2/helper/adiosRangeFilter.cpp index 141e736db9..3a3dfebbad 100644 --- a/source/adios2/helper/adiosRangeFilter.cpp +++ b/source/adios2/helper/adiosRangeFilter.cpp @@ -13,6 +13,7 @@ /// \cond EXCLUDE_FROM_DOXYGEN //#include +#include #include #include // std::invalid_argument /// \endcond @@ -133,17 +134,11 @@ bool RangeFilter::IsSelected(size_t n) size_t RangeFilter::ToSizeT(const std::string &input) { + long value; + size_t pos; try { - size_t pos; - const size_t out = static_cast(std::stoul(input, &pos)); - if (pos < input.size()) - { - helper::ThrowNested( - "Helper", "adiosRangeFilter", "ToSizeT", - "could not cast string '" + input + "' to number "); - } - return out; + value = std::stol(input, &pos); } catch (...) { @@ -151,7 +146,23 @@ size_t RangeFilter::ToSizeT(const std::string &input) "Helper", "adiosRangeFilter", "ToSizeT", "could not cast string '" + input + "' to number "); } - return 0; + if (value < 0L) + { + helper::ThrowNested( + "Helper", "adiosRangeFilter", "ToSizeT", + "Negative number '" + input + + "' not supported in range selections!"); + } + if (pos < input.size()) + { + helper::ThrowNested( + "Helper", "adiosRangeFilter", "ToSizeT", + "could not cast the entire string '" + input + + "' to a single integer number. RangeFilter accepts a " + "space-separated list of i:j:k expressions where i,j,k are " + "non-negative integers or the character 'n'"); + } + return static_cast(value); } } // end namespace helper diff --git a/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp b/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp index 19043f5610..22ba53eecb 100644 --- a/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp +++ b/testing/adios2/engine/bp/TestBPParameterSelectSteps.cpp @@ -162,13 +162,106 @@ TEST_P(BPParameterSelectStepsP, Read) #endif } +TEST_P(BPParameterSelectStepsP, Stream) +{ + int mpiRank = 0, mpiSize = 1; + std::string selection = GetSelectionString(); + // with this selection these are the original (absolute) steps + // that this reader will now see as steps 0,1,2,... + std::vector absoluteSteps = GetSteps(); + +#if ADIOS2_USE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); +#endif + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD); +#else + adios2::ADIOS adios; +#endif + + std::string filename = + "ParameterSelectStepsStream" + std::to_string(mpiSize) + ".bp"; + adios2::IO ioWrite = adios.DeclareIO("TestIOWrite"); + ioWrite.SetEngine(engineName); + adios2::Engine writer = ioWrite.Open(filename, adios2::Mode::Write); + + adios2::IO ioRead = adios.DeclareIO("TestIORead"); + ioRead.SetEngine(engineName); + ioRead.SetParameter("SelectSteps", selection); + ioRead.SetParameter("OpenTimeoutSecs", "30.0"); + adios2::Engine reader = ioRead.Open(filename, adios2::Mode::Read); + EXPECT_TRUE(reader); + + // Number of elements per process + const std::size_t Nx = 10; + adios2::Dims shape{static_cast(mpiSize * Nx)}; + adios2::Dims start{static_cast(mpiRank * Nx)}; + adios2::Dims count{static_cast(Nx)}; + + auto var0 = ioWrite.DefineVariable("var", shape, start, count); + + for (size_t step = 0; step < NSteps; ++step) + { + int s = static_cast(step); + auto d = GenerateData(s, mpiRank, mpiSize); + writer.BeginStep(); + writer.Put(var0, d.data()); + writer.EndStep(); + + if (!mpiRank) + { + std::cout << "Writer done step " << step << std::endl; + } + auto status = reader.BeginStep(adios2::StepMode::Read, 1.0f); + if (!mpiRank) + { + std::cout << "Reader BeginStep() step " << step << " status " + << status << std::endl; + } + if (status == adios2::StepStatus::OK) + { + size_t readStep = reader.CurrentStep(); + if (!mpiRank) + { + std::cout << "Reader got read step " << readStep + << ". Check if it equals to writer step " + << absoluteSteps[readStep] << std::endl; + } + std::vector res; + adios2::Variable var = + ioRead.InquireVariable("var"); + var.SetSelection({{Nx * mpiRank}, {Nx}}); + reader.Get(var, res, adios2::Mode::Sync); + int s = static_cast(absoluteSteps[readStep]); + auto d = GenerateData(s, mpiRank, mpiSize); + EXPECT_EQ(res[0], d[0]); + reader.EndStep(); + } + else + { + EXPECT_EQ(status, adios2::StepStatus::NotReady); + } + } + writer.Close(); + auto status = reader.BeginStep(adios2::StepMode::Read, 1.0f); + EXPECT_EQ(status, adios2::StepStatus::EndOfStream); + reader.Close(); +#if ADIOS2_USE_MPI + MPI_Barrier(MPI_COMM_WORLD); +#endif +} + const std::vector s_0n1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; const std::vector s_152 = {1, 3, 5}; +const std::vector s_3n3 = {3, 6, 9}; const std::vector s_1n2_0n2 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; INSTANTIATE_TEST_SUITE_P(BPParameterSelectSteps, BPParameterSelectStepsP, ::testing::Values(std::make_tuple("0:n:1", s_0n1), std::make_tuple("1:5:2", s_152), + std::make_tuple("3:n:3", s_3n3), std::make_tuple("1:n:2 0:n:2", s_1n2_0n2))); diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 05fe448f18..49081b1e46 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -205,9 +205,19 @@ endif() # BP5 tests if(ADIOS2_HAVE_BP5) - set (BP5_TESTS ${ALL_SIMPLE_TESTS}) + set (BP5_TESTS ${ALL_SIMPLE_TESTS} ${SPECIAL_TESTS}) # Delayed reader not worth testing on file engines list (FILTER BP5_TESTS EXCLUDE REGEX "DelayedReader") + # Discard not a feature of BP5 + list (FILTER BP5_TESTS EXCLUDE REGEX ".*DiscardWriter.1x1") + # PreciousTimestep not a feature of BP5 + list (FILTER BP5_TESTS EXCLUDE REGEX ".*PreciousTimestep") + # LatestTimestep not a feature of BP5 + list (FILTER BP5_TESTS EXCLUDE REGEX ".*LatestReader") + # KillWriter fails with BP5 + list (FILTER BP5_TESTS EXCLUDE REGEX ".*KillWriter") + # KillReaders We swear this isn't necessary for BP5 streaming + list (FILTER BP5_TESTS EXCLUDE REGEX ".*KillReaders") foreach(test ${BP5_TESTS}) add_common_test(${test} BP5) endforeach() @@ -223,7 +233,7 @@ if(NOT MSVC) # not on windows MutateTestSet( BP4_STREAM_TESTS "BPS" reader "OpenTimeoutSecs=10,BeginStepPollingFrequencySecs=0.1" "${BP4_STREAM_TESTS}") # SharedVars fail with BP4_streaming* list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*SharedVar.BPS$") - # Discard not a feature of BP4 + # Discard not a feature of BP4 list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*DiscardWriter.1x1.*BPS$") # PreciousTimestep not a feature of BP4 list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*Precious.*BPS$") @@ -236,7 +246,7 @@ if(NOT MSVC) # not on windows # KillWriter fails with BP4 list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*KillWriter.*BPS$") # KillReaders We swear this isn't necessary for BP4 streaming - list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*KillReaders.*BPS$") + list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*KillReaders.*BPS$") # SharedVars fail with BP4_streaming* list (FILTER BP4_STREAM_TESTS EXCLUDE REGEX ".*SharedVar.BPS$") # Local fail with BP4_streaming*