diff --git a/src/vt/vrt/collection/collection_directory.h b/src/vt/vrt/collection/collection_directory.h new file mode 100644 index 0000000000..9d2e9f1001 --- /dev/null +++ b/src/vt/vrt/collection/collection_directory.h @@ -0,0 +1,82 @@ +/* +//@HEADER +// ***************************************************************************** +// +// collection_directory.h +// DARMA Toolkit v. 1.0.0 +// DARMA/vt => Virtual Transport +// +// Copyright 2019 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H +#define INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H + +#include +#include + +namespace vt { namespace vrt { namespace collection { + +template +struct CollectionDirectory { + + struct Element { + Element() = default; + Element(IndexT in_idx, std::string in_file_name, std::size_t in_bytes) + : idx_(in_idx), file_name_(in_file_name), bytes_(in_bytes) + { } + + template + void serialize(SerializerT& s) { + s | idx_ | file_name_ | bytes_; + } + + IndexT idx_; + std::string file_name_ = ""; + std::size_t bytes_ = 0; + }; + + template + void serialize(SerializerT& s) { + s | elements_; + } + + std::vector elements_; +}; + +}}} /* end namespace vt::vrt::collection */ + +#endif /*INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H*/ diff --git a/src/vt/vrt/collection/manager.h b/src/vt/vrt/collection/manager.h index 4b6016849f..ee2d47cc7b 100644 --- a/src/vt/vrt/collection/manager.h +++ b/src/vt/vrt/collection/manager.h @@ -239,6 +239,18 @@ struct CollectionManager typename ColT::IndexType range, TagType const& tag = no_tag ); + template < + typename ColT, mapping::ActiveMapTypedFnType fn + > + InsertToken constructInsert( + typename ColT::IndexType range, TagType const& tag = no_tag + ); + + template + InsertToken constructInsertMap( + typename ColT::IndexType range, HandlerType const& map_han, TagType const& tag + ); + template CollectionProxyWrapType finishedInsert(InsertToken&& token); @@ -852,6 +864,65 @@ struct CollectionManager template IndexT getRange(VirtualProxyType proxy); + /** + * \brief Make the filename for checkpoint/restore + * + * \param[in] range range for collection + * \param[in] idx index of element + * \param[in] file_base base file name + * \param[in] make_sub_dirs whether to make sub-directories for elements: + * useful when the number of collection elements are large + * \param[in] files_per_directory number of files to output for each sub-dir + * + * \return full path of a file for the element + */ + template + std::string makeFilename( + IndexT range, IndexT idx, std::string file_base, + bool make_sub_dirs, int files_per_directory + ); + + /** + * \brief Make the filename for meta-data related to checkpoint/restore + * + * \param[in] file_base base file name + * + * \return meta-data file name for the node + */ + template + std::string makeMetaFilename(std::string file_base, bool make_sub_dirs); + + /** + * \brief Checkpoint the collection (collective). Must wait for termination + * (consistent snapshot) of work on the collection before invoking. + * + * \param[in] proxy the proxy of the collection + * \param[in] file_base the base file name of the files write + * \param[in] make_sub_dirs whether to make sub-directories for elements: + * useful when the number of collection elements are large + * \param[in] files_per_directory number of files to output for each sub-dir + * + * \return the range of the collection + */ + template + void checkpointToFile( + CollectionProxyWrapType proxy, std::string const& file_base, + bool make_sub_dirs = true, int files_per_directory = 4 + ); + + /** + * \brief Restore the collection (collective) from file. + * + * \param[in] range the range of the collection to restart + * \param[in] file_base the base file name for the files to read + * + * \return proxy to the new collection + */ + template + CollectionProxyWrapType restoreFromFile( + typename ColT::IndexType range, std::string const& file_base + ); + private: template static EpochType getCurrentEpoch(MsgT* msg); diff --git a/src/vt/vrt/collection/manager.impl.h b/src/vt/vrt/collection/manager.impl.h index 97be5a31cb..2066dd4ce6 100644 --- a/src/vt/vrt/collection/manager.impl.h +++ b/src/vt/vrt/collection/manager.impl.h @@ -68,6 +68,7 @@ #include "vt/vrt/collection/dispatch/dispatch.h" #include "vt/vrt/collection/dispatch/registry.h" #include "vt/vrt/collection/holders/insert_context_holder.h" +#include "vt/vrt/collection/collection_directory.h" #include "vt/vrt/proxy/collection_proxy.h" #include "vt/registry/auto/map/auto_registry_map.h" #include "vt/registry/auto/collection/auto_registry_collection.h" @@ -83,6 +84,8 @@ #include #include #include +#include +#include #include "fmt/format.h" #include "fmt/ostream.h" @@ -1946,13 +1949,30 @@ void CollectionManager::staticInsert( } -template +template < + typename ColT, mapping::ActiveMapTypedFnType fn +> InsertToken CollectionManager::constructInsert( typename ColT::IndexType range, TagType const& tag ) { - using IndexT = typename ColT::IndexType; + using IndexT = typename ColT::IndexType; + auto const& map_han = auto_registry::makeAutoHandlerMap(); + return constructInsertMap(range, map_han, tag); +} +template +InsertToken CollectionManager::constructInsert( + typename ColT::IndexType range, TagType const& tag +) { auto const map_han = getDefaultMap(); + return constructInsertMap(range, map_han, tag); +} + +template +InsertToken CollectionManager::constructInsertMap( + typename ColT::IndexType range, HandlerType const& map_han, TagType const& tag +) { + using IndexT = typename ColT::IndexType; // Create a new distributed proxy, ordered wrt the input tag auto const& proxy = makeDistProxy<>(tag); @@ -3255,6 +3275,143 @@ IndexT CollectionManager::getRange(VirtualProxyType proxy) { return col_holder->max_idx; } +template +std::string CollectionManager::makeMetaFilename( + std::string file_base, bool make_sub_dirs +) { + auto this_node = theContext()->getNode(); + if (make_sub_dirs) { + + auto subdir = fmt::format("{}/directory-{}", file_base, this_node); + int flag = mkdir(subdir.c_str(), S_IRWXU); + if (flag < 0 && errno != EEXIST) { + throw std::runtime_error("Failed to create directory: " + subdir); + } + + return fmt::format( + "{}/directory-{}/{}.directory", file_base, this_node, this_node + ); + } else { + return fmt::format("{}.{}.directory", file_base, this_node); + } +} + +template +std::string CollectionManager::makeFilename( + IndexT range, IndexT idx, std::string file_base, bool make_sub_dirs, + int files_per_directory +) { + vtAssert(files_per_directory >= 1, "Must be >= 1"); + + std::string idx_str = ""; + for (int i = 0; i < idx.ndims(); i++) { + idx_str += fmt::format("{}{}", idx[i], i < idx.ndims() - 1 ? "." : ""); + } + if (make_sub_dirs) { + auto lin = mapping::linearizeDenseIndexColMajor(&idx, &range); + auto dir_name = lin / files_per_directory; + + int flag = 0; + flag = mkdir(file_base.c_str(), S_IRWXU); + if (flag < 0 && errno != EEXIST) { + throw std::runtime_error("Failed to create directory: " + file_base); + } + + auto subdir = fmt::format("{}/{}", file_base, dir_name); + flag = mkdir(subdir.c_str(), S_IRWXU); + if (flag < 0 && errno != EEXIST) { + throw std::runtime_error("Failed to create directory: " + subdir); + } + + return fmt::format("{}/{}/{}", file_base, dir_name, idx_str); + } else { + return fmt::format("{}-{}", file_base, idx_str); + } +} + +template +void CollectionManager::checkpointToFile( + CollectionProxyWrapType proxy, std::string const& file_base, + bool make_sub_dirs, int files_per_directory +) { + auto proxy_bits = proxy.getProxy(); + + debug_print( + vrt_coll, node, + "checkpointToFile: proxy={:x}, file_base={}\n", + proxy_bits, file_base + ); + + // Get the element holder + auto holder_ = findElmHolder(proxy_bits); + vtAssert(holder_ != nullptr, "Must have valid holder for collection"); + + auto range = getRange(proxy_bits); + + CollectionDirectory directory; + + holder_->foreach([&](IndexT const& idx, CollectionBase* elm) { + auto const name = makeFilename( + range, idx, file_base, make_sub_dirs, files_per_directory + ); + auto const bytes = checkpoint::getSize(*static_cast(elm)); + directory.elements_.emplace_back( + typename CollectionDirectory::Element{idx, name, bytes} + ); + + checkpoint::serializeToFile(*static_cast(elm), name); + }); + + auto const directory_name = makeMetaFilename(file_base, make_sub_dirs); + checkpoint::serializeToFile(directory, directory_name); +} + +template +CollectionManager::CollectionProxyWrapType +CollectionManager::restoreFromFile( + typename ColT::IndexType range, std::string const& file_base +) { + using IndexType = typename ColT::IndexType; + using DirectoryType = CollectionDirectory; + + auto token = constructInsert(range); + + auto metadata_file_name = makeMetaFilename(file_base, false); + + if (access(metadata_file_name.c_str(), F_OK) == -1) { + // file doesn't exist, try looking in sub-directory + metadata_file_name = makeMetaFilename(file_base, true); + } + + if (access(metadata_file_name.c_str(), F_OK) == -1) { + throw std::runtime_error("Collection directory file cannot be found"); + } + + auto directory = checkpoint::deserializeFromFile( + metadata_file_name + ); + + for (auto&& elm : directory->elements_) { + auto idx = elm.idx_; + auto file_name = elm.file_name_; + + if (access(file_name.c_str(), F_OK) == -1) { + auto err = fmt::format( + "Collection element file cannot be found: idx={}, file={}", + idx, file_name + ); + throw std::runtime_error(err); + } + + // @todo: error check the file read with bytes in directory + + auto col_ptr = checkpoint::deserializeFromFile(file_name); + token[idx].insert(std::move(*col_ptr)); + } + + return finishedInsert(std::move(token)); +} + }}} /* end namespace vt::vrt::collection */ #endif /*INCLUDED_VRT_COLLECTION_MANAGER_IMPL_H*/ diff --git a/src/vt/vrt/collection/staged_token/token.h b/src/vt/vrt/collection/staged_token/token.h index 2366f22296..384b1d17b4 100644 --- a/src/vt/vrt/collection/staged_token/token.h +++ b/src/vt/vrt/collection/staged_token/token.h @@ -52,9 +52,9 @@ namespace vt { namespace vrt { namespace collection { template struct InsertTokenRval { - InsertTokenRval(VirtualProxyType const& in_proxy, IndexT&& in_idx) + InsertTokenRval(VirtualProxyType const& in_proxy, IndexT in_idx) : proxy_(in_proxy), - idx_(std::move(in_idx)) + idx_(in_idx) { } InsertTokenRval(InsertTokenRval const&) = delete; InsertTokenRval(InsertTokenRval&&) = default; @@ -81,14 +81,50 @@ struct InsertToken { virtual ~InsertToken() = default; public: - InsertTokenRval operator[](IndexT&& idx) { - return InsertTokenRval{proxy_,std::forward(idx)}; + template < + typename Tp, + typename = typename std::enable_if< + std::is_convertible< + typename IndexT::BuildIndexType, typename std::decay::type + >::value, Tp + >::type + > + InsertTokenRval operator[](Tp&& tp) const { + return InsertTokenRval{proxy_,IndexT{std::forward(tp)}}; } - template - InsertTokenRval operator[](IdxArgs&&... args) { - using Base = typename IndexT::DenseIndexType; - return InsertTokenRval{proxy_,IndexT(static_cast(args)...)}; + template < + typename Tp, typename... Tn, + typename = typename std::enable_if< + std::is_convertible< + typename IndexT::BuildIndexType, typename std::decay::type + >::value, Tp + >::type + > + InsertTokenRval operator()(Tp&& tp, Tn&&... tn) const { + return InsertTokenRval{ + proxy_,IndexT{std::forward(tp),std::forward(tn)...} + }; + } + + template < + typename IndexU, + typename = typename std::enable_if< + std::is_same::type>::value, IndexU + >::type + > + InsertTokenRval operator[](IndexU const& idx) const { + return InsertTokenRval{proxy_,idx}; + } + + template < + typename IndexU, + typename = typename std::enable_if< + std::is_same::type>::value, IndexU + >::type + > + InsertTokenRval operator()(IndexU const& idx) const { + return InsertTokenRval{proxy_,idx}; } friend CollectionManager; diff --git a/tests/unit/collection/test_checkpoint.cc b/tests/unit/collection/test_checkpoint.cc new file mode 100644 index 0000000000..196a1d5d69 --- /dev/null +++ b/tests/unit/collection/test_checkpoint.cc @@ -0,0 +1,206 @@ +/* +//@HEADER +// ***************************************************************************** +// +// test_checkpoint.cc +// DARMA Toolkit v. 1.0.0 +// DARMA/vt => Virtual Transport +// +// Copyright 2019 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include +#include + +#include "test_parallel_harness.h" + +#include + +namespace vt { namespace tests { namespace unit { + +static constexpr std::size_t data1_len = 1024; +static constexpr std::size_t data2_len = 64; + +struct TestCol : vt::Collection { + + TestCol() = default; + + struct NullMsg : vt::CollectionMessage {}; + + void init(NullMsg*) { + auto idx = getIndex(); + + iter = 1; + data1.resize(data1_len); + data2.resize(data2_len); + + for (std::size_t i = 0; i < data1_len; i++) { + data1[i] = i + idx.x() * 24 + idx.y() * 48 + idx.z(); + } + for (std::size_t i = 0; i < data2_len; i++) { + data2[i] = i + idx.x() * 124 + idx.y() * 148 + idx.z(); + } + + token = std::make_shared(129); + } + + void doIter(NullMsg*) { + iter++; + for (auto& elm : data1) { elm += 1.; } + for (auto& elm : data2) { elm += 1.; } + } + + void nullToken(NullMsg*) { + token = nullptr; + } + + void verify(NullMsg*) { + auto idx = getIndex(); + + EXPECT_EQ(iter, 6); + EXPECT_EQ(data1.size(), data1_len); + EXPECT_EQ(data2.size(), data2_len); + + for (std::size_t i = 0; i < data1_len; i++) { + EXPECT_EQ(data1[i], 5 + i + idx.x() * 24 + idx.y() * 48 + idx.z()); + } + for (std::size_t i = 0; i < data2_len; i++) { + EXPECT_EQ(data2[i], 5 + i + idx.x() * 124 + idx.y() * 148 + idx.z()); + } + + EXPECT_NE(token, nullptr); + EXPECT_EQ(*token, 129); + } + + template + void serialize(SerializerT& s) { + vt::Collection::serialize(s); + s | iter; + s | data1 | data2; + if (s.isUnpacking()) { + token = std::make_shared(); + } + s | *token; + } + + int iter = 0; + std::vector data1, data2; + std::shared_ptr token; +}; + +static void runInEpoch(std::function fn) { + vt::EpochType ep = vt::theTerm()->makeEpochCollective(); + vt::theMsg()->pushEpoch(ep); + fn(); + vt::theMsg()->popEpoch(ep); + vt::theTerm()->finishedEpoch(ep); + bool done = false; + vt::theTerm()->addAction(ep, [&]{ done = true; }); + vt::theSched()->runSchedulerWhile([&] { return not done; }); +} + +using TestCheckpoint = TestParallelHarness; + +static constexpr int32_t const num_elms = 8; + +TEST_F(TestCheckpoint, test_checkpoint_1) { + auto this_node = theContext()->getNode(); + auto num_nodes = static_cast(theContext()->getNumNodes()); + + auto range = vt::Index3D(num_nodes, num_elms, 4); + auto checkpoint_name = "test_checkpoint_dir"; + + { + auto proxy = vt::theCollection()->constructCollective( + range, [](vt::Index3D){ + return std::make_unique(); + } + ); + + runInEpoch([&]{ + if (this_node == 0) { + proxy.broadcast(); + } + }); + + for (int i = 0; i < 5; i++) { + runInEpoch([&]{ + if (this_node == 0) { + proxy.template broadcast(); + } + }); + } + + vt::theCollection()->checkpointToFile(proxy, checkpoint_name); + + // Wait for all checkpoints to complete + vt::theCollective()->barrier(); + + // Null the token to ensure we don't end up getting the same instance + runInEpoch([&]{ + if (this_node == 0) { + proxy.broadcast(); + } + }); + + // Destroy the collection + runInEpoch([&]{ + if (this_node == 0) { + proxy.destroy(); + } + }); + + vt::theCollective()->barrier(); + } + + { + auto proxy = vt::theCollection()->restoreFromFile( + range, checkpoint_name + ); + + // Restoration should be done now + vt::theCollective()->barrier(); + + runInEpoch([&]{ + if (this_node == 0) { + proxy.broadcast(); + } + }); + } + +} + +}}} // end namespace vt::tests::unit