Skip to content

Commit

Permalink
Add iterate_nonstreaming_series test to parallel tests
Browse files Browse the repository at this point in the history
This covers the use case that the snapshot attribute has more than just
one entry.
  • Loading branch information
franzpoeschel committed Jan 14, 2025
1 parent 8d04523 commit b9ef630
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 42 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ if(openPMD_BUILD_TESTING)
elseif(${test_name} STREQUAL "ParallelIO")
list(APPEND ${out_list}
test/Files_ParallelIO/read_variablebased_randomaccess.cpp
test/Files_ParallelIO/iterate_nonstreaming_series.cpp
)
endif()
endmacro()
Expand Down
72 changes: 72 additions & 0 deletions test/Files_ParallelIO/ParallelIOTests.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,78 @@
#include <openPMD/openPMD.hpp>

#if openPMD_HAVE_MPI

using namespace openPMD;

struct BackendSelection
{
std::string backendName;
std::string extension;

[[nodiscard]] inline std::string jsonBaseConfig() const
{
return R"({"backend": ")" + backendName + "\"}";
}
};

inline std::vector<BackendSelection> testedBackends()
{
auto variants = getVariants();
std::map<std::string, std::string> extensions{
{"adios2", "bp"}, {"hdf5", "h5"}};
std::vector<BackendSelection> res;
for (auto const &pair : variants)
{
if (pair.second)
{
auto lookup = extensions.find(pair.first);
if (lookup != extensions.end())
{
std::string extension = lookup->second;
res.push_back({pair.first, std::move(extension)});
}
}
}
return res;
}

inline std::vector<std::string> getBackends()
{
// first component: backend file ending
// second component: whether to test 128 bit values
std::vector<std::string> res;
#if openPMD_HAVE_ADIOS2
res.emplace_back("bp");
#endif
#if openPMD_HAVE_HDF5
res.emplace_back("h5");
#endif
return res;
}

inline auto const backends = getBackends();

inline std::vector<std::string> testedFileExtensions()
{
auto allExtensions = getFileExtensions();
auto newEnd = std::remove_if(
allExtensions.begin(), allExtensions.end(), [](std::string const &ext) {
// sst and ssc need a receiver for testing
// bp4 is already tested via bp
return ext == "sst" || ext == "ssc" || ext == "bp4" ||
ext == "toml" || ext == "json";
});
return {allExtensions.begin(), newEnd};
}

namespace read_variablebased_randomaccess
{
auto read_variablebased_randomaccess() -> void;
}

namespace iterate_nonstreaming_series
{
auto iterate_nonstreaming_series() -> void;
}

#endif
186 changes: 186 additions & 0 deletions test/Files_ParallelIO/iterate_nonstreaming_series.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#include "ParallelIOTests.hpp"

#include "openPMD/IO/ADIOS/macros.hpp"

#include <catch2/catch.hpp>
#include <mpi.h>

namespace iterate_nonstreaming_series
{

static auto run_test(
std::string const &file,
bool variableBasedLayout,
std::string const &jsonConfig) -> void
{
int mpi_size, mpi_rank;
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);

constexpr size_t base_extent = 10;
size_t const extent = base_extent * size_t(mpi_size);

{
Series writeSeries(file, Access::CREATE, MPI_COMM_WORLD, jsonConfig);
if (variableBasedLayout)
{
writeSeries.setIterationEncoding(IterationEncoding::variableBased);
}
// use conventional API to write iterations
auto iterations = writeSeries.iterations;
for (size_t i = 0; i < 10; ++i)
{
auto iteration = iterations[i];
auto E_x = iteration.meshes["E"]["x"];
E_x.resetDataset(
openPMD::Dataset(openPMD::Datatype::INT, {2, extent}));
int value = variableBasedLayout ? 0 : i;
std::vector<int> data(extent, value);
E_x.storeChunk(
data, {0, base_extent * size_t(mpi_rank)}, {1, base_extent});
bool taskSupportedByBackend = true;
DynamicMemoryView<int> memoryView;
{
auto currentBuffer = memoryView.currentBuffer();
REQUIRE(currentBuffer.data() == nullptr);
REQUIRE(currentBuffer.size() == 0);
}
memoryView = E_x.storeChunk<int>(
{1, base_extent * size_t(mpi_rank)},
{1, base_extent},
/*
* Hijack the functor that is called for buffer creation.
* This allows us to check if the backend has explicit support
* for buffer creation or if the fallback implementation is
* used.
*/
[&taskSupportedByBackend](size_t size) {
taskSupportedByBackend = false;
return std::shared_ptr<int>{
new int[size], [](auto *ptr) { delete[] ptr; }};
});
if (writeSeries.backend() == "ADIOS2")
{
// that backend must support span creation
REQUIRE(taskSupportedByBackend);
}
auto span = memoryView.currentBuffer();
for (size_t j = 0; j < span.size(); ++j)
{
span[j] = j;
}

/*
* This is to test whether defaults are correctly written for
* scalar record components since there previously was a bug.
*/
auto scalarMesh =
iteration
.meshes["i_energyDensity"][MeshRecordComponent::SCALAR];
scalarMesh.resetDataset(
Dataset(Datatype::INT, {5 * size_t(mpi_size)}));
auto scalarSpan =
scalarMesh.storeChunk<int>({5 * size_t(mpi_rank)}, {5})
.currentBuffer();
for (size_t j = 0; j < scalarSpan.size(); ++j)
{
scalarSpan[j] = j;
}
// we encourage manually closing iterations, but it should not
// matter so let's do the switcharoo for this test
if (i % 2 == 0)
{
writeSeries.flush();
}
else
{
iteration.close();
}
}
}

for (auto access : {Access::READ_LINEAR, Access::READ_ONLY})
{
Series readSeries(
file,
access,
MPI_COMM_WORLD,
json::merge(jsonConfig, R"({"defer_iteration_parsing": true})"));

size_t last_iteration_index = 0;
// conventionally written Series must be readable with streaming-aware
// API!
for (auto iteration : readSeries.readIterations())
{
// ReadIterations takes care of Iteration::open()ing iterations
auto E_x = iteration.meshes["E"]["x"];
REQUIRE(E_x.getDimensionality() == 2);
REQUIRE(E_x.getExtent()[0] == 2);
REQUIRE(E_x.getExtent()[1] == extent);
auto chunk = E_x.loadChunk<int>({0, 0}, {1, extent});
auto chunk2 = E_x.loadChunk<int>({1, 0}, {1, extent});
// we encourage manually closing iterations, but it should not
// matter so let's do the switcharoo for this test
if (last_iteration_index % 2 == 0)
{
readSeries.flush();
}
else
{
iteration.close();
}

int value = variableBasedLayout ? 0 : iteration.iterationIndex;
for (size_t i = 0; i < extent; ++i)
{
REQUIRE(chunk.get()[i] == value);
REQUIRE(chunk2.get()[i] == int(i % base_extent));
}
last_iteration_index = iteration.iterationIndex;
}
REQUIRE(last_iteration_index == 9);
}
}

auto iterate_nonstreaming_series() -> void
{
for (auto const &backend : testedBackends())
{
run_test(
"../samples/iterate_nonstreaming_series/parallel_filebased_%T." +
backend.extension,
false,
backend.jsonBaseConfig());
run_test(
"../samples/iterate_nonstreaming_series/parallel_groupbased." +
backend.extension,
false,
backend.jsonBaseConfig());
#if openPMD_HAVE_ADIOS2 && openPMD_HAVE_ADIOS2_BP5
if (backend.extension == "bp")
{
run_test(
"../samples/iterate_nonstreaming_series/"
"parallel_filebased_bp5_%T." +
backend.extension,
false,
json::merge(
backend.jsonBaseConfig(), "adios2.engine.type = \"bp5\""));
run_test(
"../samples/iterate_nonstreaming_series/"
"parallel_groupbased_bp5." +
backend.extension,
false,
json::merge(
backend.jsonBaseConfig(), "adios2.engine.type = \"bp5\""));
}
#endif
}
#if openPMD_HAVE_ADIOS2 && openPMD_HAS_ADIOS_2_9
run_test(
"../samples/iterate_nonstreaming_series/parallel_variablebased.bp",
true,
R"({"backend": "adios2"})");
#endif
}
} // namespace iterate_nonstreaming_series
48 changes: 11 additions & 37 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
#include "openPMD/openPMD.hpp"
#include <catch2/catch.hpp>

#if openPMD_HAVE_MPI
#if !openPMD_HAVE_MPI
TEST_CASE("none", "[parallel]")
{}

#else

#include <mpi.h>

#if openPMD_HAVE_ADIOS2
Expand All @@ -34,42 +39,6 @@

using namespace openPMD;

std::vector<std::string> getBackends()
{
// first component: backend file ending
// second component: whether to test 128 bit values
std::vector<std::string> res;
#if openPMD_HAVE_ADIOS2
res.emplace_back("bp");
#endif
#if openPMD_HAVE_HDF5
res.emplace_back("h5");
#endif
return res;
}

auto const backends = getBackends();

std::vector<std::string> testedFileExtensions()
{
auto allExtensions = getFileExtensions();
auto newEnd = std::remove_if(
allExtensions.begin(), allExtensions.end(), [](std::string const &ext) {
// sst and ssc need a receiver for testing
// bp4 is already tested via bp
return ext == "sst" || ext == "ssc" || ext == "bp4" ||
ext == "toml" || ext == "json";
});
return {allExtensions.begin(), newEnd};
}

#else

TEST_CASE("none", "[parallel]")
{}
#endif

#if openPMD_HAVE_MPI
TEST_CASE("parallel_multi_series_test", "[parallel]")
{
std::list<Series> allSeries;
Expand Down Expand Up @@ -2211,4 +2180,9 @@ TEST_CASE("read_variablebased_randomaccess")
read_variablebased_randomaccess::read_variablebased_randomaccess();
}

TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
{
iterate_nonstreaming_series::iterate_nonstreaming_series();
}

#endif // openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI
12 changes: 7 additions & 5 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6409,26 +6409,28 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
for (auto const &backend : testedBackends())
{
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_filebased_%T." +
"../samples/iterate_nonstreaming_series/serial_filebased_%T." +
backend.extension,
false,
backend.jsonBaseConfig());
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_groupbased." +
"../samples/iterate_nonstreaming_series/serial_groupbased." +
backend.extension,
false,
backend.jsonBaseConfig());
#if openPMD_HAVE_ADIOS2 && openPMD_HAVE_ADIOS2_BP5
if (backend.extension == "bp")
{
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_filebased_bp5_%T." +
"../samples/iterate_nonstreaming_series/"
"serial_filebased_bp5_%T." +
backend.extension,
false,
json::merge(
backend.jsonBaseConfig(), "adios2.engine.type = \"bp5\""));
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_groupbased_bp5." +
"../samples/iterate_nonstreaming_series/"
"serial_groupbased_bp5." +
backend.extension,
false,
json::merge(
Expand All @@ -6438,7 +6440,7 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
}
#if openPMD_HAVE_ADIOS2 && openPMD_HAS_ADIOS_2_9
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_variablebased.bp",
"../samples/iterate_nonstreaming_series/serial_variablebased.bp",
true,
R"({"backend": "adios2"})");
#endif
Expand Down

0 comments on commit b9ef630

Please sign in to comment.