Skip to content

Commit

Permalink
added threading to ssc reader
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonRuonanWang committed Jan 19, 2021
1 parent 1c97dac commit 033b28f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
45 changes: 40 additions & 5 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
m_OpenTimeoutSecs);

Expand Down Expand Up @@ -105,9 +106,15 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
StepStatus stepStatus;
BeginStepFlexible(stepStatus);
if (stepStatus == StepStatus::EndOfStream)
if (m_Threading && m_EndStepThread.joinable())
{
m_EndStepThread.join();
}
else
{
BeginStepFlexible(m_StepStatus);
}
if (m_StepStatus == StepStatus::EndOfStream)
{
return StepStatus::EndOfStream;
}
Expand Down Expand Up @@ -326,6 +333,18 @@ void SscReader::EndStepFirstFlexible()

void SscReader::EndStepConsequentFlexible() { MPI_Win_free(&m_MpiWin); }

void SscReader::EndBeginStepFirstFlexible()
{
EndStepFirstFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndBeginStepConsequentFlexible()
{
EndStepConsequentFlexible();
BeginStepFlexible(m_StepStatus);
}

void SscReader::EndStep()
{
TAU_SCOPED_TIMER_FUNC();
Expand All @@ -347,11 +366,27 @@ void SscReader::EndStep()
{
if (m_CurrentStep == 0)
{
EndStepFirstFlexible();
if (m_Threading)
{
m_EndStepThread =
std::thread(&SscReader::EndBeginStepFirstFlexible, this);
}
else
{
EndStepFirstFlexible();
}
}
else
{
EndStepConsequentFlexible();
if (m_Threading)
{
m_EndStepThread = std::thread(
&SscReader::EndBeginStepConsequentFlexible, this);
}
else
{
EndStepConsequentFlexible();
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class SscReader : public Engine
MPI_Win m_MpiWin;
MPI_Group m_MpiAllWritersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
StepStatus m_StepStatus;
std::thread m_EndStepThread;

int m_StreamRank;
int m_StreamSize;
Expand All @@ -68,6 +69,8 @@ class SscReader : public Engine
void EndStepFixed();
void EndStepFirstFlexible();
void EndStepConsequentFlexible();
void EndBeginStepFirstFlexible();
void EndBeginStepConsequentFlexible();

#define declare_type(T) \
void DoGetSync(Variable<T> &, T *) final; \
Expand All @@ -94,6 +97,8 @@ class SscReader : public Engine

int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down

0 comments on commit 033b28f

Please sign in to comment.