Skip to content

Commit

Permalink
variableBased as default in ADIOS2
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Jan 14, 2025
1 parent 6185eeb commit e5918c7
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 49 deletions.
5 changes: 2 additions & 3 deletions examples/5_write_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@
# in streaming setups, e.g. an iteration cannot be opened again once
# it has been closed.
# `Series.iterations` can be directly accessed in random-access workflows.
series.iterations[1].open()
mymesh = series.iterations[1]. \
mymesh = series.write_iterations()[1]. \
meshes["mymesh"]

# example 1D domain decomposition in first index
Expand Down Expand Up @@ -92,7 +91,7 @@
# The iteration can be closed in order to help free up resources.
# The iteration's content will be flushed automatically.
# An iteration once closed cannot (yet) be reopened.
series.iterations[1].close()
series.write_iterations()[1].close()

if 0 == comm.rank:
print("Dataset content has been fully written to disk")
Expand Down
6 changes: 6 additions & 0 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "openPMD/Error.hpp"
#include "openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp"
#include "openPMD/IO/ADIOS/ADIOS2FilePosition.hpp"
#include "openPMD/IO/ADIOS/macros.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/IO/AbstractIOHandlerImpl.hpp"
#include "openPMD/IO/AbstractIOHandlerImplCommon.hpp"
Expand Down Expand Up @@ -860,6 +861,11 @@ class ADIOS2IOHandler : public AbstractIOHandler
return "ADIOS2";
}

bool fullSupportForVariableBasedEncoding() const override
{
return openPMD_HAS_ADIOS_2_9;
}

std::future<void> flush(internal::ParsedFlushParams &) override;
}; // ADIOS2IOHandler
} // namespace openPMD
1 change: 1 addition & 0 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ class AbstractIOHandler

/** The currently used backend */
virtual std::string backendName() const = 0;
virtual bool fullSupportForVariableBasedEncoding() const;

std::string directory;
/*
Expand Down
10 changes: 10 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class Series;

namespace internal
{
enum class default_or_explicit : bool
{
default_,
explicit_
};
/**
* @brief Data members for Series. Pinned at one memory location.
*
Expand Down Expand Up @@ -183,6 +188,8 @@ namespace internal
* The iteration encoding used in this series.
*/
IterationEncoding m_iterationEncoding{};
default_or_explicit m_iterationEncodingSetExplicitly =
default_or_explicit::default_;
/**
* Detected IO format (backend).
*/
Expand Down Expand Up @@ -969,6 +976,9 @@ OPENPMD_private
*/
void flushStep(bool doFlush);

Series &setIterationEncoding_internal(
IterationEncoding iterationEncoding, internal::default_or_explicit);

/*
* Returns the current content of the /data/snapshot attribute.
* (We could also add this to the public API some time)
Expand Down
1 change: 1 addition & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,7 @@ void ADIOS2IOHandlerImpl::readAttributeAllsteps(
auto IO = adios.DeclareIO("PreparseSnapshots");
// @todo check engine type
IO.SetEngine(realEngineType());
IO.SetParameter("StreamReader", "ON"); // this be for BP4
auto engine = IO.Open(fullPath(*file), adios2::Mode::Read);
auto status = engine.BeginStep();
auto type = detail::attributeInfo(IO, name, /* verbose = */ true);
Expand Down
5 changes: 5 additions & 0 deletions src/IO/AbstractIOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ std::future<void> AbstractIOHandler::flush(internal::FlushParams const &params)
json::warnGlobalUnusedOptions(parsedParams.backendConfig);
return future;
}

bool AbstractIOHandler::fullSupportForVariableBasedEncoding() const
{
return false;
}
} // namespace openPMD
137 changes: 94 additions & 43 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,45 +581,7 @@ IterationEncoding Series::iterationEncoding() const

Series &Series::setIterationEncoding(IterationEncoding ie)
{
auto &series = get();
if (series.m_deferred_initialization)
{
runDeferredInitialization();
}
if (written())
throw std::runtime_error(
"A files iterationEncoding can not (yet) be changed after it has "
"been written.");

series.m_iterationEncoding = ie;
switch (ie)
{
case IterationEncoding::fileBased:
setIterationFormat(series.m_name);
setAttribute("iterationEncoding", std::string("fileBased"));
// This checks that the name contains the expansion pattern
// (e.g. %T) and parses it
if (series.m_filenamePadding < 0)
{
if (!reparseExpansionPattern(series.m_name))
{
throw error::WrongAPIUsage(
"For fileBased formats the iteration expansion pattern "
"%T must "
"be included in the file name");
}
}
break;
case IterationEncoding::groupBased:
setIterationFormat(BASEPATH);
setAttribute("iterationEncoding", std::string("groupBased"));
break;
case IterationEncoding::variableBased:
setIterationFormat(auxiliary::replace_first(basePath(), "/%T/", ""));
setAttribute("iterationEncoding", std::string("variableBased"));
break;
}
IOHandler()->setIterationEncoding(ie);
setIterationEncoding_internal(ie, internal::default_or_explicit::default_);
return *this;
}

Expand Down Expand Up @@ -1170,7 +1132,9 @@ Given file pattern: ')END"
setWritten(false, Attributable::EnqueueAsynchronously::No);

initDefaults(input->iterationEncoding);
setIterationEncoding(input->iterationEncoding);
setIterationEncoding_internal(
input->iterationEncoding,
series.m_iterationEncodingSetExplicitly);

setWritten(true, Attributable::EnqueueAsynchronously::No);
}
Expand All @@ -1186,12 +1150,14 @@ Given file pattern: ')END"
}
case Access::CREATE: {
initDefaults(input->iterationEncoding);
setIterationEncoding(input->iterationEncoding);
setIterationEncoding_internal(
input->iterationEncoding, series.m_iterationEncodingSetExplicitly);
break;
}
case Access::APPEND: {
initDefaults(input->iterationEncoding);
setIterationEncoding(input->iterationEncoding);
setIterationEncoding_internal(
input->iterationEncoding, series.m_iterationEncodingSetExplicitly);
if (input->iterationEncoding != IterationEncoding::fileBased)
{
break;
Expand Down Expand Up @@ -1857,7 +1823,8 @@ void Series::readOneIterationFileBased(std::string const &filePath)
"Unknown iterationEncoding: " + encoding);
auto old_written = written();
setWritten(false, Attributable::EnqueueAsynchronously::No);
setIterationEncoding(encoding_out);
setIterationEncoding_internal(
encoding_out, internal::default_or_explicit::explicit_);
setWritten(old_written, Attributable::EnqueueAsynchronously::Yes);
}
else
Expand Down Expand Up @@ -2645,6 +2612,59 @@ void Series::flushStep(bool doFlush)
series.m_wroteAtLeastOneIOStep = true;
}

Series &Series::setIterationEncoding_internal(
IterationEncoding ie, internal::default_or_explicit doe)
{
auto &series = get();
switch (doe)
{
case internal::default_or_explicit::default_:
case internal::default_or_explicit::explicit_:
// mark this option as set explicitly by the user
series.m_iterationEncodingSetExplicitly = doe;
break;
}
if (series.m_deferred_initialization)
{
runDeferredInitialization();
}
if (written())
throw std::runtime_error(
"A files iterationEncoding can not (yet) be changed after it has "
"been written.");

series.m_iterationEncoding = ie;
switch (ie)
{
case IterationEncoding::fileBased:
setIterationFormat(series.m_name);
setAttribute("iterationEncoding", std::string("fileBased"));
// This checks that the name contains the expansion pattern
// (e.g. %T) and parses it
if (series.m_filenamePadding < 0)
{
if (!reparseExpansionPattern(series.m_name))
{
throw error::WrongAPIUsage(
"For fileBased formats the iteration expansion pattern "
"%T must "
"be included in the file name");
}
}
break;
case IterationEncoding::groupBased:
setIterationFormat(BASEPATH);
setAttribute("iterationEncoding", std::string("groupBased"));
break;
case IterationEncoding::variableBased:
setIterationFormat(auxiliary::replace_first(basePath(), "/%T/", ""));
setAttribute("iterationEncoding", std::string("variableBased"));
break;
}
IOHandler()->setIterationEncoding(ie);
return *this;
}

auto Series::openIterationIfDirty(IterationIndex_t index, Iteration &iteration)
-> IterationOpened
{
Expand Down Expand Up @@ -2939,6 +2959,8 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input)
options, "iteration_encoding", iterationEncoding);
if (!iterationEncoding.empty())
{
series.m_iterationEncodingSetExplicitly =
internal::default_or_explicit::explicit_;
auto it = ieDescriptors.find(iterationEncoding);
if (it != ieDescriptors.end())
{
Expand Down Expand Up @@ -3190,6 +3212,35 @@ Series::snapshots(std::optional<SnapshotWorkflow> const snapshot_workflow)
}
}

/*
* ADIOS2 should use variable-based encoding as a default when applicable,
* since group-based encoding has severe limitations in ADIOS2.
* The below logic checks if variable-based encoding should be used.
*/

if (
// 1. No encoding has been explicitly selected by the user.
// Flag set by Series::setIterationEncoding().
series.m_iterationEncodingSetExplicitly ==
internal::default_or_explicit::default_ &&
// 2. Iteration encoding was recognized as groupBased by init()
// procedures (and not file-based).
series.m_iterationEncoding == IterationEncoding::groupBased &&
// 3. The IO workflow will be synchronous, necessary for writing
// variable-based data (but not for reading!).
usedSnapshotWorkflow == SnapshotWorkflow::Synchronous &&
// 4. The chosen access type is write-only, otherwise the encoding is
// determined by the previous file content.
access::writeOnly(access) &&
// 5. The backend is ADIOS2 in a recent enough version to support
// modifiable attributes (v2.9).
IOHandler()->fullSupportForVariableBasedEncoding())
{
setIterationEncoding_internal(
IterationEncoding::variableBased,
internal::default_or_explicit::default_);
}

switch (usedSnapshotWorkflow)
{
case SnapshotWorkflow::RandomAccess: {
Expand Down
5 changes: 4 additions & 1 deletion test/Files_SerialIO/close_and_reopen_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ auto run_test_groupbased(
{
std::string filename =
"../samples/close_iteration_reopen/groupbased." + ext;
Series series(filename, Access::CREATE, write_cfg);
Series series(
filename,
Access::CREATE,
json::merge(write_cfg, R"({"iteration_encoding": "group_based"})"));
{
auto it = writeIterations(series)[0];
auto E_x = it.meshes["E"]["x"];
Expand Down
5 changes: 4 additions & 1 deletion test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,10 @@ void adios2_streaming(bool variableBasedLayout)
Series writeSeries(
"../samples/adios2_stream.sst",
Access::CREATE,
"adios2.engine.type = \"sst\"");
variableBasedLayout
? "adios2.engine.type = \"sst\""
: "adios2.engine.type = \"sst\"\niteration_encoding = "
"\"group_based\"");
if (variableBasedLayout)
{
writeSeries.setIterationEncoding(IterationEncoding::variableBased);
Expand Down
2 changes: 1 addition & 1 deletion test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7027,7 +7027,7 @@ TEST_CASE("unfinished_iteration_test", "[serial]")
unfinished_iteration_test(
"bp",
IterationEncoding::groupBased,
R"({"backend": "adios2"})",
R"({"backend": "adios2", "iteration_encoding": "group_based"})",
/* test_linear_access = */ false);
#if openPMD_HAS_ADIOS_2_9
unfinished_iteration_test(
Expand Down

0 comments on commit e5918c7

Please sign in to comment.