Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a performance issue for first step data in ssc #2571

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ struct BlockInfo
size_t bufferStart;
size_t bufferCount;
std::vector<char> value;
void *data;
bool performed;
};
using BlockVec = std::vector<BlockInfo>;
using BlockVecVec = std::vector<BlockVec>;
Expand Down
112 changes: 108 additions & 4 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
m_ReaderSelectionsLocked == false)
{
m_AllReceivingWriterRanks.clear();
m_ReceivedRanks.clear();
m_Buffer.resize(1, 0);
m_GlobalWritePattern.clear();
m_GlobalWritePattern.resize(m_StreamSize);
Expand Down Expand Up @@ -164,7 +163,94 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
return StepStatus::OK;
}

void SscReader::PerformGets() {}
void SscReader::PerformGets()
{

if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
size_t oldSize = m_AllReceivingWriterRanks.size();
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t newSize = m_AllReceivingWriterRanks.size();
if (oldSize != newSize)
{
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
}
}

for (auto &br : m_LocalReadPattern)
{
if (br.performed)
{
continue;
}
for (const auto &i : m_AllReceivingWriterRanks)
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
if (b.name == br.name)
{
if (b.type == DataType::String)
{
*reinterpret_cast<std::string *>(br.data) =
std::string(b.value.begin(), b.value.end());
}
#define declare_type(T) \
else if (b.type == helper::GetDataType<T>()) \
{ \
if (b.shapeId == ShapeID::GlobalArray || \
b.shapeId == ShapeID::LocalArray) \
{ \
bool empty = false; \
for (const auto c : b.count) \
{ \
if (c == 0) \
{ \
empty = true; \
} \
} \
if (empty) \
{ \
continue; \
} \
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart, b.start, \
b.count, true, true, \
reinterpret_cast<char *>(br.data), br.start, \
br.count, true, true); \
} \
else if (b.shapeId == ShapeID::GlobalValue || \
b.shapeId == ShapeID::LocalValue) \
{ \
std::memcpy(br.data, m_Buffer.data() + b.bufferStart, \
b.bufferCount); \
} \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
else { throw(std::runtime_error("unknown data type")); }
}
}
}
br.performed = true;
}
}
}

size_t SscReader::CurrentStep() const { return m_CurrentStep; }

Expand All @@ -179,6 +265,8 @@ void SscReader::EndStep()
<< m_CurrentStep << std::endl;
}

PerformGets();

if (m_WriterDefinitionsLocked &&
m_ReaderSelectionsLocked) // fixed IO pattern
{
Expand Down Expand Up @@ -407,12 +495,28 @@ void SscReader::SyncReadPattern()
<< m_CurrentStep << std::endl;
}

nlohmann::json localReadPatternJson;

for (const auto &i : m_LocalReadPattern)
{
localReadPatternJson["Variables"].emplace_back();
auto &jref = localReadPatternJson["Variables"].back();
jref["Name"] = i.name;
jref["Type"] = i.type;
jref["ShapeID"] = i.shapeId;
jref["Start"] = i.start;
jref["Count"] = i.count;
jref["Shape"] = i.shape;
jref["BufferStart"] = i.bufferStart;
jref["BufferCount"] = i.bufferCount;
}

if (m_ReaderRank == 0)
{
m_LocalReadPatternJson["Pattern"] = m_ReaderSelectionsLocked;
localReadPatternJson["Pattern"] = m_ReaderSelectionsLocked;
}

std::string localStr = m_LocalReadPatternJson.dump();
std::string localStr = localReadPatternJson.dump();

// aggregate global read pattern
size_t localSize = localStr.size();
Expand Down
4 changes: 1 addition & 3 deletions source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class SscReader : public Engine
ssc::BlockVecVec m_GlobalWritePattern;
ssc::BlockVec m_LocalReadPattern;
nlohmann::json m_GlobalWritePatternJson;
nlohmann::json m_LocalReadPatternJson;

ssc::RankPosMap m_AllReceivingWriterRanks;
std::vector<char> m_Buffer;
Expand All @@ -55,7 +54,6 @@ class SscReader : public Engine
MPI_Comm m_StreamComm;
std::string m_MpiMode = "twosided";
std::vector<MPI_Request> m_MpiRequests;
std::unordered_set<int> m_ReceivedRanks;

int m_StreamRank;
int m_StreamSize;
Expand Down Expand Up @@ -84,7 +82,7 @@ class SscReader : public Engine
void GetDeferredCommon(Variable<T> &variable, T *data);

template <class T>
void GetDeferredDeltaCommon(Variable<T> &variable);
void GetDeferredDeltaCommon(Variable<T> &variable, T *data);

void CalculatePosition(ssc::BlockVecVec &mapVec,
ssc::RankPosMap &allOverlapRanks);
Expand Down
129 changes: 52 additions & 77 deletions source/adios2/engine/ssc/SscReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace engine
{

template <class T>
void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
void SscReader::GetDeferredDeltaCommon(Variable<T> &variable, T *data)
{
TAU_SCOPED_TIMER_FUNC();

Expand All @@ -42,10 +42,15 @@ void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
m_LocalReadPattern.emplace_back();
auto &b = m_LocalReadPattern.back();
b.name = variable.m_Name;
b.count = vCount;
b.type = helper::GetDataType<T>();
b.shapeId = variable.m_ShapeID;
b.start = vStart;
b.count = vCount;
b.shape = vShape;
b.type = helper::GetDataType<T>();
b.bufferStart = 0;
b.bufferCount = 0;
b.data = data;
b.performed = false;

for (const auto &d : b.count)
{
Expand All @@ -55,42 +60,6 @@ void SscReader::GetDeferredDeltaCommon(Variable<T> &variable)
"SetSelection count dimensions cannot be 0"));
}
}

m_LocalReadPatternJson["Variables"].emplace_back();
auto &jref = m_LocalReadPatternJson["Variables"].back();
jref["Name"] = variable.m_Name;
jref["Type"] = helper::GetDataType<T>();
jref["ShapeID"] = variable.m_ShapeID;
jref["Start"] = vStart;
jref["Count"] = vCount;
jref["Shape"] = vShape;
jref["BufferStart"] = 0;
jref["BufferCount"] = 0;

ssc::JsonToBlockVecVec(m_GlobalWritePatternJson, m_GlobalWritePattern);
size_t oldSize = m_AllReceivingWriterRanks.size();
m_AllReceivingWriterRanks =
ssc::CalculateOverlap(m_GlobalWritePattern, m_LocalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_AllReceivingWriterRanks);
size_t newSize = m_AllReceivingWriterRanks.size();
if (oldSize != newSize)
{
size_t totalDataSize = 0;
for (auto i : m_AllReceivingWriterRanks)
{
totalDataSize += i.second.second;
}
m_Buffer.resize(totalDataSize);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Win_lock(MPI_LOCK_SHARED, i.first, 0, m_MpiWin);
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
MPI_Win_unlock(i.first, m_MpiWin);
m_ReceivedRanks.insert(i.first);
}
}
}

template <>
Expand All @@ -103,16 +72,19 @@ void SscReader::GetDeferredCommon(Variable<std::string> &variable,
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
GetDeferredDeltaCommon(variable);
GetDeferredDeltaCommon(variable, data);
}
for (const auto &i : m_AllReceivingWriterRanks)
else
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
for (const auto &i : m_AllReceivingWriterRanks)
{
if (b.name == variable.m_Name)
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
*data = std::string(b.value.begin(), b.value.end());
if (b.name == variable.m_Name)
{
*data = std::string(b.value.begin(), b.value.end());
}
}
}
}
Expand All @@ -138,46 +110,49 @@ void SscReader::GetDeferredCommon(Variable<T> &variable, T *data)
if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
m_ReaderSelectionsLocked == false)
{
GetDeferredDeltaCommon(variable);
GetDeferredDeltaCommon(variable, data);
}

for (const auto &i : m_AllReceivingWriterRanks)
else
{
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)

for (const auto &i : m_AllReceivingWriterRanks)
{
if (b.name == variable.m_Name)
const auto &v = m_GlobalWritePattern[i.first];
for (const auto &b : v)
{
bool empty = false;
for (const auto c : b.count)
if (b.name == variable.m_Name)
{
if (c == 0)
bool empty = false;
for (const auto c : b.count)
{
empty = true;
if (c == 0)
{
empty = true;
}
}
if (empty)
{
continue;
}
}
if (empty)
{
continue;
}

if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart, b.start,
b.count, true, true,
reinterpret_cast<char *>(data), vStart,
vCount, true, true);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
{
std::memcpy(data, m_Buffer.data() + b.bufferStart,
b.bufferCount);
}
else
{
throw(std::runtime_error("ShapeID not supported"));
if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy<T>(m_Buffer.data() + b.bufferStart,
b.start, b.count, true, true,
reinterpret_cast<char *>(data),
vStart, vCount, true, true);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
{
std::memcpy(data, m_Buffer.data() + b.bufferStart,
b.bufferCount);
}
else
{
throw(std::runtime_error("ShapeID not supported"));
}
}
}
}
Expand Down
Loading