Skip to content

Commit

Permalink
refine SSC reader begin step functions
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonRuonanWang committed Jan 16, 2021
1 parent 358c93e commit 9964939
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
86 changes: 49 additions & 37 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,49 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::MpiWait()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
}

void SscReader::BeginStepFlexible(StepStatus &status)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
{
status = StepStatus::EndOfStream;
return;
}
MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}

StepStatus SscReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{
Expand All @@ -62,44 +105,16 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
m_LocalReadPattern.clear();
m_GlobalWritePatternJson.clear();
bool finalStep = SyncWritePattern();
if (finalStep)
StepStatus stepStatus;
BeginStepFlexible(stepStatus);
if (stepStatus == StepStatus::EndOfStream)
{
return StepStatus::EndOfStream;
}

MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
}
else
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
MpiWait();
}

for (const auto &r : m_GlobalWritePattern)
Expand Down Expand Up @@ -267,8 +282,7 @@ void SscReader::EndStep()

PerformGets();

if (m_WriterDefinitionsLocked &&
m_ReaderSelectionsLocked) // fixed IO pattern
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
if (m_CurrentStep == 0)
{
Expand Down Expand Up @@ -316,7 +330,7 @@ void SscReader::EndStep()
}
}
}
else // flexible IO pattern
else
{
MPI_Win_free(&m_MpiWin);
if (m_CurrentStep == 0)
Expand All @@ -328,8 +342,6 @@ void SscReader::EndStep()
m_StepBegun = false;
}

// PRIVATE

void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class SscReader : public Engine
void SyncMpiPattern();
bool SyncWritePattern();
void SyncReadPattern();
void MpiWait();
void BeginStepFlexible(StepStatus &status);

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand Down

0 comments on commit 9964939

Please sign in to comment.