Skip to content

Commit

Permalink
Merge pull request #2326 from pnorbert/fix-performputs-buffersize
Browse files Browse the repository at this point in the history
Fix performputs buffersize
  • Loading branch information
pnorbert authored Jun 16, 2020
2 parents 1fbe2cc + 39a545f commit 5b6f691
Show file tree
Hide file tree
Showing 18 changed files with 509 additions and 55 deletions.
11 changes: 11 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ ADIOS2_FOREACH_PRIMITIVE_TYPE_1ARG(declare_template_instantiation)
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

size_t Engine::DebugGetDataBufferSize() const
{
helper::CheckForNullptr(m_Engine,
"in call to Engine::DebugGetDataBufferSize");
if (m_Engine->m_EngineType == "NULL")
{
return 0;
}
return m_Engine->DebugGetDataBufferSize();
}

std::string ToString(const Engine &engine)
{
return std::string("Engine(Name: \"" + engine.Name() + "\", Type: \"" +
Expand Down
3 changes: 3 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ class Engine
*/
void LockReaderSelections();

/* Debug function for adios2 testing framework */
size_t DebugGetDataBufferSize() const;

private:
Engine(core::Engine *engine);
core::Engine *m_Engine = nullptr;
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ void Engine::LockReaderSelections() noexcept
m_ReaderSelectionsLocked = true;
}

size_t Engine::DebugGetDataBufferSize() const
{
ThrowUp("DebugGetDataBufferSize");
return 0;
}

// PROTECTED
void Engine::Init() {}
void Engine::InitParameters() {}
Expand Down Expand Up @@ -171,7 +177,7 @@ ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)

size_t Engine::DoSteps() const
{
ThrowUp("DoPut");
ThrowUp("DoSteps");
return MaxSizeT;
}

Expand Down
3 changes: 3 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ class Engine
*/
void LockReaderSelections() noexcept;

/* for adios2 internal testing */
virtual size_t DebugGetDataBufferSize() const;

protected:
/** from ADIOS class passed to Engine created with Open
* if no communicator is passed */
Expand Down
38 changes: 35 additions & 3 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void BP3Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP3Serializer.m_DeferredVariables.clear();
m_BP3Serializer.m_DeferredVariablesDataSize = 0;
}

void BP3Writer::EndStep()
Expand Down Expand Up @@ -180,6 +181,7 @@ void BP3Writer::InitTransports()

m_BP3Serializer.m_Profiler.Start("mkdir");
m_FileDataManager.MkDirsBarrier(bpSubStreamNames,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Parameters.NodeLocal);
m_BP3Serializer.m_Profiler.Stop("mkdir");

Expand Down Expand Up @@ -260,6 +262,17 @@ void BP3Writer::WriteProfilingJSONFile()
{
TAU_SCOPED_TIMER("BP3Writer::WriteProfilingJSONFile");
auto transportTypes = m_FileDataManager.GetTransportsTypes();

// find first File type output, where we can write the profile
int fileTransportIdx = -1;
for (size_t i = 0; i < transportTypes.size(); ++i)
{
if (transportTypes[i].compare(0, 4, "File") == 0)
{
fileTransportIdx = static_cast<int>(i);
}
}

auto transportProfilers = m_FileDataManager.GetTransportsProfilers();

auto transportTypesMD = m_FileMetadataManager.GetTransportsTypes();
Expand All @@ -282,9 +295,24 @@ void BP3Writer::WriteProfilingJSONFile()
if (m_BP3Serializer.m_RankMPI == 0)
{
transport::FileFStream profilingJSONStream(m_Comm);
auto bpBaseNames = m_BP3Serializer.GetBPBaseNames({m_Name});
profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
Mode::Write);
std::string profileFileName;
if (fileTransportIdx > -1)
{
// write profile to <filename.bp>.dir/profiling.json
auto bpBaseNames = m_BP3Serializer.GetBPBaseNames({m_Name});
profileFileName = bpBaseNames[fileTransportIdx] + "/profiling.json";
}
else
{
// write profile to <filename.bp>_profiling.json
auto transportsNames = m_FileMetadataManager.GetFilesBaseNames(
m_Name, m_IO.m_TransportsParameters);

auto bpMetadataFileNames =
m_BP3Serializer.GetBPMetadataFileNames(transportsNames);
profileFileName = bpMetadataFileNames[0] + "_profiling.json";
}
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(), profilingJSON.size());
profilingJSONStream.Close();
}
Expand Down Expand Up @@ -413,6 +441,10 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP3Writer::DebugGetDataBufferSize() const
{
return m_BP3Serializer.DebugGetDataBufferSize();
}
} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp3/BP3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class BP3Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP3Serializer m_BP3Serializer;
Expand Down Expand Up @@ -85,7 +87,8 @@ class BP3Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
22 changes: 14 additions & 8 deletions source/adios2/engine/bp3/BP3Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,21 @@ void BP3Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP3Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);
format::BP3Base::ResizeResult resizeResult =
format::BP3Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

const format::BP3Base::ResizeResult resizeResult =
m_BP3Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
resizeResult = m_BP3Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}

// if first timestep Write create a new pg index or in time aggregation
if (!m_BP3Serializer.m_MetadataSet.DataPGIsOpen)
Expand Down Expand Up @@ -133,7 +139,7 @@ void BP3Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
61 changes: 48 additions & 13 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void BP4Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP4Serializer.m_DeferredVariables.clear();
m_BP4Serializer.m_DeferredVariablesDataSize = 0;
}

void BP4Writer::EndStep()
Expand Down Expand Up @@ -203,13 +204,14 @@ void BP4Writer::InitTransports()

/* Create the directories either on target or burst buffer if used */
m_BP4Serializer.m_Profiler.Start("mkdir");
m_FileDataManager.MkDirsBarrier(m_SubStreamNames,
m_BP4Serializer.m_Parameters.NodeLocal ||
m_WriteToBB);
m_FileDataManager.MkDirsBarrier(
m_SubStreamNames, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Parameters.NodeLocal || m_WriteToBB);
if (m_DrainBB)
{
/* Create the directories on target anyway by main thread */
m_FileDataManager.MkDirsBarrier(m_DrainSubStreamNames,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Parameters.NodeLocal);
}
m_BP4Serializer.m_Profiler.Stop("mkdir");
Expand Down Expand Up @@ -469,6 +471,17 @@ void BP4Writer::WriteProfilingJSONFile()
{
TAU_SCOPED_TIMER("BP4Writer::WriteProfilingJSONFile");
auto transportTypes = m_FileDataManager.GetTransportsTypes();

// find first File type output, where we can write the profile
int fileTransportIdx = -1;
for (size_t i = 0; i < transportTypes.size(); ++i)
{
if (transportTypes[i].compare(0, 4, "File") == 0)
{
fileTransportIdx = static_cast<int>(i);
}
}

auto transportProfilers = m_FileDataManager.GetTransportsProfilers();

auto transportTypesMD = m_FileMetadataManager.GetTransportsTypes();
Expand All @@ -491,19 +504,36 @@ void BP4Writer::WriteProfilingJSONFile()
if (m_BP4Serializer.m_RankMPI == 0)
{
// std::cout << "write profiling file!" << std::endl;
std::string profileFileName;
if (m_DrainBB)
{
auto bpTargetNames = m_BP4Serializer.GetBPBaseNames({m_Name});
std::string targetProfiler(bpTargetNames[0] + "/profiling.json");
if (fileTransportIdx > -1)
{
profileFileName =
bpTargetNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpTargetNames[0] + "_profiling.json";
}
m_FileDrainer.AddOperationWrite(
targetProfiler, profilingJSON.size(), profilingJSON.data());
profileFileName, profilingJSON.size(), profilingJSON.data());
}
else
{
transport::FileFStream profilingJSONStream(m_Comm);
auto bpBaseNames = m_BP4Serializer.GetBPBaseNames({m_BBName});
profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
Mode::Write);
if (fileTransportIdx > -1)
{
profileFileName =
bpBaseNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpBaseNames[0] + "_profiling.json";
}
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(),
profilingJSON.size());
profilingJSONStream.Close();
Expand Down Expand Up @@ -535,12 +565,12 @@ void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWriteAt(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -574,7 +604,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataFileNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(
m_MetadataFileNames[i], m_DrainMetadataFileNames[i],
Expand Down Expand Up @@ -637,7 +667,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWrite(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -678,7 +708,7 @@ void BP4Writer::WriteData(const bool isFinal, const int transportIndex)
m_FileDataManager.FlushFiles(transportIndex);
if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i], dataSize);
Expand Down Expand Up @@ -728,7 +758,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i],
Expand Down Expand Up @@ -756,6 +786,11 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP4Writer::DebugGetDataBufferSize() const
{
return m_BP4Serializer.DebugGetDataBufferSize();
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class BP4Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP4Serializer m_BP4Serializer;
Expand Down Expand Up @@ -115,7 +117,8 @@ class BP4Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
23 changes: 14 additions & 9 deletions source/adios2/engine/bp4/BP4Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,21 @@ void BP4Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP4Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);

const format::BP4Base::ResizeResult resizeResult =
m_BP4Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
format::BP4Base::ResizeResult resizeResult =
format::BP4Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

resizeResult = m_BP4Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}
// if first timestep Write create a new pg index
if (!m_BP4Serializer.m_MetadataSet.DataPGIsOpen)
{
Expand Down Expand Up @@ -133,7 +138,7 @@ void BP4Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
Loading

0 comments on commit 5b6f691

Please sign in to comment.