From 9964939645e23072fcc2f7a023cf79af0fd3fb21 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Sat, 16 Jan 2021 17:43:22 -0500 Subject: [PATCH] refine SSC reader begin step functions --- source/adios2/engine/ssc/SscReader.cpp | 86 +++++++++++++++----------- source/adios2/engine/ssc/SscReader.h | 2 + 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 24083d53fe..815ab2c637 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -43,6 +43,49 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); } +void SscReader::MpiWait() +{ + if (m_MpiMode == "twosided") + { + MPI_Waitall(static_cast(m_MpiRequests.size()), + m_MpiRequests.data(), MPI_STATUS_IGNORE); + m_MpiRequests.clear(); + } + else if (m_MpiMode == "onesidedfencepush") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpush") + { + MPI_Win_wait(m_MpiWin); + } + else if (m_MpiMode == "onesidedfencepull") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpull") + { + MPI_Win_complete(m_MpiWin); + } +} + +void SscReader::BeginStepFlexible(StepStatus &status) +{ + m_AllReceivingWriterRanks.clear(); + m_Buffer.resize(1, 0); + m_GlobalWritePattern.clear(); + m_GlobalWritePattern.resize(m_StreamSize); + m_LocalReadPattern.clear(); + m_GlobalWritePatternJson.clear(); + bool finalStep = SyncWritePattern(); + if (finalStep) + { + status = StepStatus::EndOfStream; + return; + } + MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); +} + StepStatus SscReader::BeginStep(const StepMode stepMode, const float timeoutSeconds) { @@ -62,44 +105,16 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false || m_ReaderSelectionsLocked == false) { - m_AllReceivingWriterRanks.clear(); - m_Buffer.resize(1, 0); - m_GlobalWritePattern.clear(); - m_GlobalWritePattern.resize(m_StreamSize); - m_LocalReadPattern.clear(); - m_GlobalWritePatternJson.clear(); - bool finalStep = SyncWritePattern(); - if (finalStep) + StepStatus stepStatus; + BeginStepFlexible(stepStatus); + if (stepStatus == StepStatus::EndOfStream) { return StepStatus::EndOfStream; } - - MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); } else { - if (m_MpiMode == "twosided") - { - MPI_Waitall(static_cast(m_MpiRequests.size()), - m_MpiRequests.data(), MPI_STATUS_IGNORE); - m_MpiRequests.clear(); - } - else if (m_MpiMode == "onesidedfencepush") - { - MPI_Win_fence(0, m_MpiWin); - } - else if (m_MpiMode == "onesidedpostpush") - { - MPI_Win_wait(m_MpiWin); - } - else if (m_MpiMode == "onesidedfencepull") - { - MPI_Win_fence(0, m_MpiWin); - } - else if (m_MpiMode == "onesidedpostpull") - { - MPI_Win_complete(m_MpiWin); - } + MpiWait(); } for (const auto &r : m_GlobalWritePattern) @@ -267,8 +282,7 @@ void SscReader::EndStep() PerformGets(); - if (m_WriterDefinitionsLocked && - m_ReaderSelectionsLocked) // fixed IO pattern + if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) { if (m_CurrentStep == 0) { @@ -316,7 +330,7 @@ void SscReader::EndStep() } } } - else // flexible IO pattern + else { MPI_Win_free(&m_MpiWin); if (m_CurrentStep == 0) @@ -328,8 +342,6 @@ void SscReader::EndStep() m_StepBegun = false; } -// PRIVATE - void SscReader::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 27e503fe4d..fcc25c267b 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -63,6 +63,8 @@ class SscReader : public Engine void SyncMpiPattern(); bool SyncWritePattern(); void SyncReadPattern(); + void MpiWait(); + void BeginStepFlexible(StepStatus &status); #define declare_type(T) \ void DoGetSync(Variable &, T *) final; \