diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 6b7f8016cd..8e79045bf5 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -56,14 +56,12 @@ void SscReader::GetOneSidedPostPush() { TAU_SCOPED_TIMER_FUNC(); MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin); - MPI_Win_wait(m_MpiWin); } void SscReader::GetOneSidedFencePush() { TAU_SCOPED_TIMER_FUNC(); MPI_Win_fence(0, m_MpiWin); - MPI_Win_fence(0, m_MpiWin); } void SscReader::GetOneSidedPostPull() @@ -75,7 +73,6 @@ void SscReader::GetOneSidedPostPull() MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR, m_MpiWin); } - MPI_Win_complete(m_MpiWin); } void SscReader::GetOneSidedFencePull() @@ -87,21 +84,17 @@ void SscReader::GetOneSidedFencePull() MPI_Get(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR, i.first, 0, i.second.second, MPI_CHAR, m_MpiWin); } - MPI_Win_fence(0, m_MpiWin); } void SscReader::GetTwoSided() { TAU_SCOPED_TIMER_FUNC(); - std::vector requests; for (const auto &i : m_AllReceivingWriterRanks) { - requests.emplace_back(); + m_MpiRequests.emplace_back(); MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR, - i.first, 0, m_StreamComm, &requests.back()); + i.first, 0, m_StreamComm, &m_MpiRequests.back()); } - MPI_Status statuses[requests.size()]; - MPI_Waitall(requests.size(), requests.data(), statuses); } StepStatus SscReader::BeginStep(const StepMode stepMode, @@ -121,23 +114,25 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, ++m_CurrentStep; if (m_MpiMode == "twosided") { - GetTwoSided(); + MPI_Status statuses[m_MpiRequests.size()]; + MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses); + m_MpiRequests.clear(); } else if (m_MpiMode == "onesidedfencepush") { - GetOneSidedFencePush(); + MPI_Win_fence(0, m_MpiWin); } else if (m_MpiMode == "onesidedpostpush") { - GetOneSidedPostPush(); + MPI_Win_wait(m_MpiWin); } else if (m_MpiMode == "onesidedfencepull") { - GetOneSidedFencePull(); + MPI_Win_fence(0, m_MpiWin); } else if (m_MpiMode == "onesidedpostpull") { - GetOneSidedPostPull(); + MPI_Win_complete(m_MpiWin); } } @@ -227,10 +222,26 @@ void SscReader::EndStep() MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); } - if (m_Verbosity >= 5) + + if (m_MpiMode == "twosided") { - std::cout << "SscReader::EndStep, World Rank " << m_StreamRank - << ", Reader Rank " << m_ReaderRank << std::endl; + GetTwoSided(); + } + else if (m_MpiMode == "onesidedfencepush") + { + GetOneSidedFencePush(); + } + else if (m_MpiMode == "onesidedpostpush") + { + GetOneSidedPostPush(); + } + else if (m_MpiMode == "onesidedfencepull") + { + GetOneSidedFencePull(); + } + else if (m_MpiMode == "onesidedpostpull") + { + GetOneSidedPostPull(); } } diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 31cc8bb2c2..4f702a6c53 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -53,6 +53,7 @@ class SscReader : public Engine MPI_Group m_MpiAllWritersGroup; MPI_Comm m_StreamComm; std::string m_MpiMode = "twosided"; + std::vector m_MpiRequests; int m_StreamRank; int m_StreamSize; diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index 46aba808f6..d16f2cd1bb 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -53,6 +53,8 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { TAU_SCOPED_TIMER_FUNC(); + MpiWait(); + if (m_InitialStep) { m_InitialStep = false; @@ -89,7 +91,7 @@ void SscWriter::PutOneSidedPostPush() MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin); } - MPI_Win_complete(m_MpiWin); + m_NeedWait = true; } void SscWriter::PutOneSidedFencePush() @@ -101,35 +103,33 @@ void SscWriter::PutOneSidedFencePush() MPI_Put(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, i.second.first, m_Buffer.size(), MPI_CHAR, m_MpiWin); } - MPI_Win_fence(0, m_MpiWin); + m_NeedWait = true; } void SscWriter::PutOneSidedPostPull() { TAU_SCOPED_TIMER_FUNC(); MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin); - MPI_Win_wait(m_MpiWin); + m_NeedWait = true; } void SscWriter::PutOneSidedFencePull() { TAU_SCOPED_TIMER_FUNC(); MPI_Win_fence(0, m_MpiWin); - MPI_Win_fence(0, m_MpiWin); + m_NeedWait = true; } void SscWriter::PutTwoSided() { TAU_SCOPED_TIMER_FUNC(); - std::vector requests; for (const auto &i : m_AllSendingReaderRanks) { - requests.emplace_back(); + m_MpiRequests.emplace_back(); MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, 0, - m_StreamComm, &requests.back()); + m_StreamComm, &m_MpiRequests.back()); } - MPI_Status statuses[requests.size()]; - MPI_Waitall(requests.size(), requests.data(), statuses); + m_NeedWait = true; } void SscWriter::EndStep() @@ -147,7 +147,8 @@ void SscWriter::EndStep() SyncWritePattern(); MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); - PutOneSidedPostPull(); + MPI_Win_post(m_MpiAllReadersGroup, 0, m_MpiWin); + MPI_Win_wait(m_MpiWin); MPI_Win_free(&m_MpiWin); SyncReadPattern(); MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, @@ -182,6 +183,36 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); } // PRIVATE +void SscWriter::MpiWait() +{ + if (m_NeedWait) + { + if (m_MpiMode == "twosided") + { + MPI_Status statuses[m_MpiRequests.size()]; + MPI_Waitall(m_MpiRequests.size(), m_MpiRequests.data(), statuses); + m_MpiRequests.clear(); + } + else if (m_MpiMode == "onesidedfencepush") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpush") + { + MPI_Win_complete(m_MpiWin); + } + else if (m_MpiMode == "onesidedfencepull") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpull") + { + MPI_Win_wait(m_MpiWin); + } + m_NeedWait = false; + } +} + void SscWriter::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); @@ -380,11 +411,8 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) void SscWriter::DoClose(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); - if (m_Verbosity >= 5) - { - std::cout << "SscWriter::DoClose, World Rank " << m_StreamRank - << ", Writer Rank " << m_WriterRank << std::endl; - } + + MpiWait(); m_Buffer[0] = 1; diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index dfc66f8ff2..74bf4e6487 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -54,6 +54,8 @@ class SscWriter : public Engine MPI_Group m_MpiAllReadersGroup; MPI_Comm m_StreamComm; std::string m_MpiMode = "twosided"; + bool m_NeedWait = false; + std::vector m_MpiRequests; int m_StreamRank; int m_StreamSize; @@ -70,6 +72,7 @@ class SscWriter : public Engine void PutOneSidedFencePull(); void PutOneSidedPostPull(); void PutTwoSided(); + void MpiWait(); #define declare_type(T) \ void DoPutSync(Variable &, const T *) final; \