Skip to content

Commit

Permalink
Merge pull request #2571 from JasonRuonanWang/ssc_performgets
Browse files Browse the repository at this point in the history
Fix a performance issue for first step data in ssc
  • Loading branch information
JasonRuonanWang authored Jan 6, 2021
2 parents e1786a8 + 26147d4 commit d20aeaf
Show file tree
Hide file tree
Showing 18 changed files with 203 additions and 117 deletions.
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ struct BlockInfo
size_t bufferStart;
size_t bufferCount;
std::vector<char> value;
void *data;
bool performed;
};
using BlockVec = std::vector<BlockInfo>;
using BlockVecVec = std::vector<BlockVec>;
Expand Down
112 changes: 108 additions & 4 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_ReceivedRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
Expand Down Expand Up @@ -164,7 +163,94 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
return StepStatus::OK;
}

void SscReader::PerformGets() {}
void SscReader::PerformGets()
{

if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
size_t oldSize = m_AllReceivingWriterRanks.size();
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t newSize = m_AllReceivingWriterRanks.size();
if (oldSize != newSize)
{
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
}
}

for (auto &br : m_LocalReadPattern)
{
if (br.performed)
{
continue;
}
for (const auto &i : m_AllReceivingWriterRanks)
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
if (b.name == br.name)
{
if (b.type == DataType::String)
{
*reinterpret_cast<std::string *>(br.data) =
std::string(b.value.begin(), b.value.end());
}
#define declare_type(T) \
else if (b.type == helper::GetDataType<T>()) \
{ \
if (b.shapeId == ShapeID::GlobalArray || \
b.shapeId == ShapeID::LocalArray) \
{ \
bool empty = false; \
for (const auto c : b.count) \
{ \
if (c == 0) \
{ \
empty = true; \
} \
} \
if (empty) \
{ \
continue; \
} \
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart, b.start, \
b.count, true, true, \
reinterpret_cast<char *>(br.data), br.start, \
br.count, true, true); \
} \
else if (b.shapeId == ShapeID::GlobalValue || \
b.shapeId == ShapeID::LocalValue) \
{ \
std::memcpy(br.data, m_Buffer.data() + b.bufferStart, \
b.bufferCount); \
} \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
else { throw(std::runtime_error("unknown data type")); }
}
}
}
br.performed = true;
}
}
}

size_t SscReader::CurrentStep() const { return m_CurrentStep; }

Expand All @@ -179,6 +265,8 @@ void SscReader::EndStep()
<< m_CurrentStep << std::endl;
}

PerformGets();

if (m_WriterDefinitionsLocked &&
m_ReaderSelectionsLocked) // fixed IO pattern
{
Expand Down Expand Up @@ -407,12 +495,28 @@ void SscReader::SyncReadPattern()
<< m_CurrentStep << std::endl;
}

nlohmann::json localReadPatternJson;

for (const auto &i : m_LocalReadPattern)
{
localReadPatternJson["Variables"].emplace_back();
auto &jref = localReadPatternJson["Variables"].back();
jref["Name"] = i.name;
jref["Type"] = i.type;
jref["ShapeID"] = i.shapeId;
jref["Start"] = i.start;
jref["Count"] = i.count;
jref["Shape"] = i.shape;
jref["BufferStart"] = i.bufferStart;
jref["BufferCount"] = i.bufferCount;
}

if (m_ReaderRank == 0)
{
m_LocalReadPatternJson["Pattern"] = m_ReaderSelectionsLocked;
localReadPatternJson["Pattern"] = m_ReaderSelectionsLocked;
}

std::string localStr = m_LocalReadPatternJson.dump();
std::string localStr = localReadPatternJson.dump();

// aggregate global read pattern
size_t localSize = localStr.size();
Expand Down
4 changes: 1 addition & 3 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class SscReader : public Engine
ssc::BlockVecVec m_GlobalWritePattern;
ssc::BlockVec m_LocalReadPattern;
nlohmann::json m_GlobalWritePatternJson;
nlohmann::json m_LocalReadPatternJson;

ssc::RankPosMap m_AllReceivingWriterRanks;
std::vector<char> m_Buffer;
Expand All @@ -55,7 +54,6 @@ class SscReader : public Engine
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
std::unordered_set<int> m_ReceivedRanks;

int m_StreamRank;
int m_StreamSize;
Expand Down Expand Up @@ -84,7 +82,7 @@ class SscReader : public Engine
void GetDeferredCommon(Variable<T> &variable, T *data);

template <class T>
void GetDeferredDeltaCommon(Variable<T> &variable);
void GetDeferredDeltaCommon(Variable<T> &variable, T *data);

void CalculatePosition(ssc::BlockVecVec &mapVec,
ssc::RankPosMap &allOverlapRanks);
Expand Down
129 changes: 52 additions & 77 deletions source/adios2/engine/ssc/SscReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace engine
{

template <class T>
void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
void SscReader::GetDeferredDeltaCommon(Variable<T> &variable, T *data)
{
TAU_SCOPED_TIMER_FUNC();

Expand All @@ -42,10 +42,15 @@ void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
m_LocalReadPattern.emplace_back();
auto &b = m_LocalReadPattern.back();
b.name = variable.m_Name;
b.count = vCount;
b.type = helper::GetDataType<T>();
b.shapeId = variable.m_ShapeID;
b.start = vStart;
b.count = vCount;
b.shape = vShape;
b.type = helper::GetDataType<T>();
b.bufferStart = 0;
b.bufferCount = 0;
b.data = data;
b.performed = false;

for (const auto &d : b.count)
{
Expand All @@ -55,42 +60,6 @@ void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
"SetSelection count dimensions cannot be 0"));
}
}

m_LocalReadPatternJson["Variables"].emplace_back();
auto &jref = m_LocalReadPatternJson["Variables"].back();
jref["Name"] = variable.m_Name;
jref["Type"] = helper::GetDataType<T>();
jref["ShapeID"] = variable.m_ShapeID;
jref["Start"] = vStart;
jref["Count"] = vCount;
jref["Shape"] = vShape;
jref["BufferStart"] = 0;
jref["BufferCount"] = 0;

ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
size_t oldSize = m_AllReceivingWriterRanks.size();
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t newSize = m_AllReceivingWriterRanks.size();
if (oldSize != newSize)
{
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
m_ReceivedRanks.insert(i.first);
}
}
}

template <>
Expand All @@ -103,16 +72,19 @@ void SscReader::GetDeferredCommon(Variable<std::string> &variable,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
GetDeferredDeltaCommon(variable);
GetDeferredDeltaCommon(variable, data);
}
for (const auto &i : m_AllReceivingWriterRanks)
else
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
for (const auto &i : m_AllReceivingWriterRanks)
{
if (b.name == variable.m_Name)
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
*data = std::string(b.value.begin(), b.value.end());
if (b.name == variable.m_Name)
{
*data = std::string(b.value.begin(), b.value.end());
}
}
}
}
Expand All @@ -138,46 +110,49 @@ void SscReader::GetDeferredCommon(Variable<T> &variable, T *data)
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
GetDeferredDeltaCommon(variable);
GetDeferredDeltaCommon(variable, data);
}

for (const auto &i : m_AllReceivingWriterRanks)
else
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)

for (const auto &i : m_AllReceivingWriterRanks)
{
if (b.name == variable.m_Name)
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
bool empty = false;
for (const auto c : b.count)
if (b.name == variable.m_Name)
{
if (c == 0)
bool empty = false;
for (const auto c : b.count)
{
empty = true;
if (c == 0)
{
empty = true;
}
}
if (empty)
{
continue;
}
}
if (empty)
{
continue;
}

if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart, b.start,
b.count, true, true,
reinterpret_cast<char *>(data), vStart,
vCount, true, true);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
{
std::memcpy(data, m_Buffer.data() + b.bufferStart,
b.bufferCount);
}
else
{
throw(std::runtime_error("ShapeID not supported"));
if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart,
b.start, b.count, true, true,
reinterpret_cast<char *>(data),
vStart, vCount, true, true);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
{
std::memcpy(data, m_Buffer.data() + b.bufferStart,
b.bufferCount);
}
else
{
throw(std::runtime_error("ShapeID not supported"));
}
}
}
}
Expand Down
Loading

0 comments on commit d20aeaf

Please sign in to comment.