Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performputs buffersize #2326

Merged
merged 4 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ ADIOS2_FOREACH_PRIMITIVE_TYPE_1ARG(declare_template_instantiation)
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

size_t Engine::DebugGetDataBufferSize() const
{
helper::CheckForNullptr(m_Engine,
"in call to Engine::DebugGetDataBufferSize");
if (m_Engine->m_EngineType == "NULL")
{
return 0;
}
return m_Engine->DebugGetDataBufferSize();
}

std::string ToString(const Engine &engine)
{
return std::string("Engine(Name: \"" + engine.Name() + "\", Type: \"" +
Expand Down
3 changes: 3 additions & 0 deletions bindings/CXX11/adios2/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ class Engine
*/
void LockReaderSelections();

/* Debug function for adios2 testing framework */
size_t DebugGetDataBufferSize() const;

private:
Engine(core::Engine *engine);
core::Engine *m_Engine = nullptr;
Expand Down
8 changes: 7 additions & 1 deletion source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ void Engine::LockReaderSelections() noexcept
m_ReaderSelectionsLocked = true;
}

size_t Engine::DebugGetDataBufferSize() const
{
ThrowUp("DebugGetDataBufferSize");
return 0;
}

// PROTECTED
void Engine::Init() {}
void Engine::InitParameters() {}
Expand Down Expand Up @@ -171,7 +177,7 @@ ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)

size_t Engine::DoSteps() const
{
ThrowUp("DoPut");
ThrowUp("DoSteps");
return MaxSizeT;
}

Expand Down
3 changes: 3 additions & 0 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ class Engine
*/
void LockReaderSelections() noexcept;

/* for adios2 internal testing */
virtual size_t DebugGetDataBufferSize() const;

protected:
/** from ADIOS class passed to Engine created with Open
* if no communicator is passed */
Expand Down
38 changes: 35 additions & 3 deletions source/adios2/engine/bp3/BP3Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ void BP3Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP3Serializer.m_DeferredVariables.clear();
m_BP3Serializer.m_DeferredVariablesDataSize = 0;
}

void BP3Writer::EndStep()
Expand Down Expand Up @@ -180,6 +181,7 @@ void BP3Writer::InitTransports()

m_BP3Serializer.m_Profiler.Start("mkdir");
m_FileDataManager.MkDirsBarrier(bpSubStreamNames,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Parameters.NodeLocal);
m_BP3Serializer.m_Profiler.Stop("mkdir");

Expand Down Expand Up @@ -260,6 +262,17 @@ void BP3Writer::WriteProfilingJSONFile()
{
TAU_SCOPED_TIMER("BP3Writer::WriteProfilingJSONFile");
auto transportTypes = m_FileDataManager.GetTransportsTypes();

// find first File type output, where we can write the profile
int fileTransportIdx = -1;
for (size_t i = 0; i < transportTypes.size(); ++i)
{
if (transportTypes[i].compare(0, 4, "File") == 0)
{
fileTransportIdx = static_cast<int>(i);
}
}

auto transportProfilers = m_FileDataManager.GetTransportsProfilers();

auto transportTypesMD = m_FileMetadataManager.GetTransportsTypes();
Expand All @@ -282,9 +295,24 @@ void BP3Writer::WriteProfilingJSONFile()
if (m_BP3Serializer.m_RankMPI == 0)
{
transport::FileFStream profilingJSONStream(m_Comm);
auto bpBaseNames = m_BP3Serializer.GetBPBaseNames({m_Name});
profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
Mode::Write);
std::string profileFileName;
if (fileTransportIdx > -1)
{
// write profile to <filename.bp>.dir/profiling.json
auto bpBaseNames = m_BP3Serializer.GetBPBaseNames({m_Name});
profileFileName = bpBaseNames[fileTransportIdx] + "/profiling.json";
}
else
{
// write profile to <filename.bp>_profiling.json
auto transportsNames = m_FileMetadataManager.GetFilesBaseNames(
m_Name, m_IO.m_TransportsParameters);

auto bpMetadataFileNames =
m_BP3Serializer.GetBPMetadataFileNames(transportsNames);
profileFileName = bpMetadataFileNames[0] + "_profiling.json";
}
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(), profilingJSON.size());
profilingJSONStream.Close();
}
Expand Down Expand Up @@ -413,6 +441,10 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP3Writer::DebugGetDataBufferSize() const
{
return m_BP3Serializer.DebugGetDataBufferSize();
}
} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp3/BP3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class BP3Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP3Serializer m_BP3Serializer;
Expand Down Expand Up @@ -85,7 +87,8 @@ class BP3Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
22 changes: 14 additions & 8 deletions source/adios2/engine/bp3/BP3Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,21 @@ void BP3Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP3Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);
format::BP3Base::ResizeResult resizeResult =
format::BP3Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP3Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

const format::BP3Base::ResizeResult resizeResult =
m_BP3Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
resizeResult = m_BP3Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}

// if first timestep Write create a new pg index or in time aggregation
if (!m_BP3Serializer.m_MetadataSet.DataPGIsOpen)
Expand Down Expand Up @@ -133,7 +139,7 @@ void BP3Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
61 changes: 48 additions & 13 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ void BP4Writer::PerformPuts()
#undef declare_template_instantiation
}
m_BP4Serializer.m_DeferredVariables.clear();
m_BP4Serializer.m_DeferredVariablesDataSize = 0;
}

void BP4Writer::EndStep()
Expand Down Expand Up @@ -203,13 +204,14 @@ void BP4Writer::InitTransports()

/* Create the directories either on target or burst buffer if used */
m_BP4Serializer.m_Profiler.Start("mkdir");
m_FileDataManager.MkDirsBarrier(m_SubStreamNames,
m_BP4Serializer.m_Parameters.NodeLocal ||
m_WriteToBB);
m_FileDataManager.MkDirsBarrier(
m_SubStreamNames, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Parameters.NodeLocal || m_WriteToBB);
if (m_DrainBB)
{
/* Create the directories on target anyway by main thread */
m_FileDataManager.MkDirsBarrier(m_DrainSubStreamNames,
m_IO.m_TransportsParameters,
m_BP4Serializer.m_Parameters.NodeLocal);
}
m_BP4Serializer.m_Profiler.Stop("mkdir");
Expand Down Expand Up @@ -469,6 +471,17 @@ void BP4Writer::WriteProfilingJSONFile()
{
TAU_SCOPED_TIMER("BP4Writer::WriteProfilingJSONFile");
auto transportTypes = m_FileDataManager.GetTransportsTypes();

// find first File type output, where we can write the profile
int fileTransportIdx = -1;
for (size_t i = 0; i < transportTypes.size(); ++i)
{
if (transportTypes[i].compare(0, 4, "File") == 0)
{
fileTransportIdx = static_cast<int>(i);
}
}

auto transportProfilers = m_FileDataManager.GetTransportsProfilers();

auto transportTypesMD = m_FileMetadataManager.GetTransportsTypes();
Expand All @@ -491,19 +504,36 @@ void BP4Writer::WriteProfilingJSONFile()
if (m_BP4Serializer.m_RankMPI == 0)
{
// std::cout << "write profiling file!" << std::endl;
std::string profileFileName;
if (m_DrainBB)
{
auto bpTargetNames = m_BP4Serializer.GetBPBaseNames({m_Name});
std::string targetProfiler(bpTargetNames[0] + "/profiling.json");
if (fileTransportIdx > -1)
{
profileFileName =
bpTargetNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpTargetNames[0] + "_profiling.json";
}
m_FileDrainer.AddOperationWrite(
targetProfiler, profilingJSON.size(), profilingJSON.data());
profileFileName, profilingJSON.size(), profilingJSON.data());
}
else
{
transport::FileFStream profilingJSONStream(m_Comm);
auto bpBaseNames = m_BP4Serializer.GetBPBaseNames({m_BBName});
profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
Mode::Write);
if (fileTransportIdx > -1)
{
profileFileName =
bpBaseNames[fileTransportIdx] + "/profiling.json";
}
else
{
profileFileName = bpBaseNames[0] + "_profiling.json";
}
profilingJSONStream.Open(profileFileName, Mode::Write);
profilingJSONStream.Write(profilingJSON.data(),
profilingJSON.size());
profilingJSONStream.Close();
Expand Down Expand Up @@ -535,12 +565,12 @@ void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWriteAt(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -574,7 +604,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataFileNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(
m_MetadataFileNames[i], m_DrainMetadataFileNames[i],
Expand Down Expand Up @@ -637,7 +667,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
for (size_t i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWrite(
m_DrainMetadataIndexFileNames[i],
Expand Down Expand Up @@ -678,7 +708,7 @@ void BP4Writer::WriteData(const bool isFinal, const int transportIndex)
m_FileDataManager.FlushFiles(transportIndex);
if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i], dataSize);
Expand Down Expand Up @@ -728,7 +758,7 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)

if (m_DrainBB)
{
for (int i = 0; i < m_SubStreamNames.size(); ++i)
for (size_t i = 0; i < m_SubStreamNames.size(); ++i)
{
m_FileDrainer.AddOperationCopy(m_SubStreamNames[i],
m_DrainSubStreamNames[i],
Expand Down Expand Up @@ -756,6 +786,11 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

size_t BP4Writer::DebugGetDataBufferSize() const
{
return m_BP4Serializer.DebugGetDataBufferSize();
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 4 additions & 1 deletion source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class BP4Writer : public core::Engine
void EndStep() final;
void Flush(const int transportIndex = -1) final;

size_t DebugGetDataBufferSize() const final;

private:
/** Single object controlling BP buffering */
format::BP4Serializer m_BP4Serializer;
Expand Down Expand Up @@ -115,7 +117,8 @@ class BP4Writer : public core::Engine

template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
const typename Variable<T>::Info &blockInfo,
const bool resize = true);

template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *data);
Expand Down
23 changes: 14 additions & 9 deletions source/adios2/engine/bp4/BP4Writer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,21 @@ void BP4Writer::PutCommon(Variable<T> &variable,

template <class T>
void BP4Writer::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
const typename Variable<T>::Info &blockInfo,
const bool resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name, blockInfo.Count);

const format::BP4Base::ResizeResult resizeResult =
m_BP4Serializer.ResizeBuffer(dataSize, "in call to variable " +
variable.m_Name + " Put");
format::BP4Base::ResizeResult resizeResult =
format::BP4Base::ResizeResult::Success;
if (resize)
{
const size_t dataSize =
helper::PayloadSize(blockInfo.Data, blockInfo.Count) +
m_BP4Serializer.GetBPIndexSizeInData(variable.m_Name,
blockInfo.Count);

resizeResult = m_BP4Serializer.ResizeBuffer(
dataSize, "in call to variable " + variable.m_Name + " Put");
}
// if first timestep Write create a new pg index
if (!m_BP4Serializer.m_MetadataSet.DataPGIsOpen)
{
Expand Down Expand Up @@ -133,7 +138,7 @@ void BP4Writer::PerformPutCommon(Variable<T> &variable)
auto itSpanBlock = variable.m_BlocksSpan.find(b);
if (itSpanBlock == variable.m_BlocksSpan.end())
{
PutSyncCommon(variable, variable.m_BlocksInfo[b]);
PutSyncCommon(variable, variable.m_BlocksInfo[b], false);
}
else
{
Expand Down
Loading