Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

808 Checkpoint/Restart collection #809

Merged
merged 12 commits into from
Jun 5, 2020
Merged
82 changes: 82 additions & 0 deletions src/vt/vrt/collection/collection_directory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
//@HEADER
// *****************************************************************************
//
// collection_directory.h
// DARMA Toolkit v. 1.0.0
// DARMA/vt => Virtual Transport
//
// Copyright 2019 National Technology & Engineering Solutions of Sandia, LLC
// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
// Government retains certain rights in this software.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// Questions? Contact [email protected]
//
// *****************************************************************************
//@HEADER
*/

#if !defined INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H
#define INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H

#include <vector>
#include <string>

namespace vt { namespace vrt { namespace collection {

template <typename IndexT>
struct CollectionDirectory {

struct Element {
Element() = default;
Element(IndexT in_idx, std::string in_file_name, std::size_t in_bytes)
: idx_(in_idx), file_name_(in_file_name), bytes_(in_bytes)
{ }

template <typename SerializerT>
void serialize(SerializerT& s) {
s | idx_ | file_name_ | bytes_;
}

IndexT idx_;
std::string file_name_ = "";
std::size_t bytes_ = 0;
};

template <typename SerializerT>
void serialize(SerializerT& s) {
s | elements_;
}

std::vector<Element> elements_;
};

}}} /* end namespace vt::vrt::collection */

#endif /*INCLUDED_VT_VRT_COLLECTION_COLLECTION_DIRECTORY_H*/
71 changes: 71 additions & 0 deletions src/vt/vrt/collection/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ struct CollectionManager
typename ColT::IndexType range, TagType const& tag = no_tag
);

template <
typename ColT, mapping::ActiveMapTypedFnType<typename ColT::IndexType> fn
>
InsertToken<ColT> constructInsert(
typename ColT::IndexType range, TagType const& tag = no_tag
);

template <typename ColT>
InsertToken<ColT> constructInsertMap(
typename ColT::IndexType range, HandlerType const& map_han, TagType const& tag
);

template <typename ColT>
CollectionProxyWrapType<ColT> finishedInsert(InsertToken<ColT>&& token);

Expand Down Expand Up @@ -852,6 +864,65 @@ struct CollectionManager
template <typename ColT, typename IndexT = typename ColT::IndexType>
IndexT getRange(VirtualProxyType proxy);

/**
* \brief Make the filename for checkpoint/restore
*
* \param[in] range range for collection
* \param[in] idx index of element
* \param[in] file_base base file name
* \param[in] make_sub_dirs whether to make sub-directories for elements:
* useful when the number of collection elements are large
* \param[in] files_per_directory number of files to output for each sub-dir
*
* \return full path of a file for the element
*/
template <typename IndexT>
std::string makeFilename(
IndexT range, IndexT idx, std::string file_base,
bool make_sub_dirs, int files_per_directory
);

/**
* \brief Make the filename for meta-data related to checkpoint/restore
*
* \param[in] file_base base file name
*
* \return meta-data file name for the node
*/
template <typename IndexT>
std::string makeMetaFilename(std::string file_base, bool make_sub_dirs);

/**
* \brief Checkpoint the collection (collective). Must wait for termination
* (consistent snapshot) of work on the collection before invoking.
*
* \param[in] proxy the proxy of the collection
* \param[in] file_base the base file name of the files write
* \param[in] make_sub_dirs whether to make sub-directories for elements:
* useful when the number of collection elements are large
* \param[in] files_per_directory number of files to output for each sub-dir
*
* \return the range of the collection
*/
template <typename ColT, typename IndexT = typename ColT::IndexType>
void checkpointToFile(
CollectionProxyWrapType<ColT> proxy, std::string const& file_base,
bool make_sub_dirs = true, int files_per_directory = 4
);

/**
* \brief Restore the collection (collective) from file.
*
* \param[in] range the range of the collection to restart
* \param[in] file_base the base file name for the files to read
*
* \return proxy to the new collection
*/
template <typename ColT>
CollectionProxyWrapType<ColT> restoreFromFile(
typename ColT::IndexType range, std::string const& file_base
);

private:
template <typename MsgT>
static EpochType getCurrentEpoch(MsgT* msg);
Expand Down
161 changes: 159 additions & 2 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "vt/vrt/collection/dispatch/dispatch.h"
#include "vt/vrt/collection/dispatch/registry.h"
#include "vt/vrt/collection/holders/insert_context_holder.h"
#include "vt/vrt/collection/collection_directory.h"
#include "vt/vrt/proxy/collection_proxy.h"
#include "vt/registry/auto/map/auto_registry_map.h"
#include "vt/registry/auto/collection/auto_registry_collection.h"
Expand All @@ -83,6 +84,8 @@
#include <functional>
#include <cassert>
#include <memory>
#include <sys/stat.h>
#include <unistd.h>

#include "fmt/format.h"
#include "fmt/ostream.h"
Expand Down Expand Up @@ -1946,13 +1949,30 @@ void CollectionManager::staticInsert(

}

template <typename ColT>
template <
typename ColT, mapping::ActiveMapTypedFnType<typename ColT::IndexType> fn
>
InsertToken<ColT> CollectionManager::constructInsert(
typename ColT::IndexType range, TagType const& tag
) {
using IndexT = typename ColT::IndexType;
using IndexT = typename ColT::IndexType;
auto const& map_han = auto_registry::makeAutoHandlerMap<IndexT, fn>();
return constructInsertMap<ColT>(range, map_han, tag);
}

template <typename ColT>
InsertToken<ColT> CollectionManager::constructInsert(
typename ColT::IndexType range, TagType const& tag
) {
auto const map_han = getDefaultMap<ColT>();
return constructInsertMap<ColT>(range, map_han, tag);
}

template <typename ColT>
InsertToken<ColT> CollectionManager::constructInsertMap(
typename ColT::IndexType range, HandlerType const& map_han, TagType const& tag
) {
using IndexT = typename ColT::IndexType;

// Create a new distributed proxy, ordered wrt the input tag
auto const& proxy = makeDistProxy<>(tag);
Expand Down Expand Up @@ -3255,6 +3275,143 @@ IndexT CollectionManager::getRange(VirtualProxyType proxy) {
return col_holder->max_idx;
}

template <typename IndexT>
std::string CollectionManager::makeMetaFilename(
std::string file_base, bool make_sub_dirs
) {
auto this_node = theContext()->getNode();
if (make_sub_dirs) {

auto subdir = fmt::format("{}/directory-{}", file_base, this_node);
int flag = mkdir(subdir.c_str(), S_IRWXU);
if (flag < 0 && errno != EEXIST) {
throw std::runtime_error("Failed to create directory: " + subdir);
}

return fmt::format(
"{}/directory-{}/{}.directory", file_base, this_node, this_node
);
} else {
return fmt::format("{}.{}.directory", file_base, this_node);
}
}

template <typename IndexT>
std::string CollectionManager::makeFilename(
IndexT range, IndexT idx, std::string file_base, bool make_sub_dirs,
int files_per_directory
) {
vtAssert(files_per_directory >= 1, "Must be >= 1");

std::string idx_str = "";
for (int i = 0; i < idx.ndims(); i++) {
idx_str += fmt::format("{}{}", idx[i], i < idx.ndims() - 1 ? "." : "");
}
if (make_sub_dirs) {
auto lin = mapping::linearizeDenseIndexColMajor(&idx, &range);
auto dir_name = lin / files_per_directory;

int flag = 0;
flag = mkdir(file_base.c_str(), S_IRWXU);
if (flag < 0 && errno != EEXIST) {
throw std::runtime_error("Failed to create directory: " + file_base);
}

auto subdir = fmt::format("{}/{}", file_base, dir_name);
flag = mkdir(subdir.c_str(), S_IRWXU);
if (flag < 0 && errno != EEXIST) {
throw std::runtime_error("Failed to create directory: " + subdir);
}

return fmt::format("{}/{}/{}", file_base, dir_name, idx_str);
} else {
return fmt::format("{}-{}", file_base, idx_str);
}
}

template <typename ColT, typename IndexT>
void CollectionManager::checkpointToFile(
CollectionProxyWrapType<ColT> proxy, std::string const& file_base,
bool make_sub_dirs, int files_per_directory
) {
auto proxy_bits = proxy.getProxy();

debug_print(
vrt_coll, node,
"checkpointToFile: proxy={:x}, file_base={}\n",
proxy_bits, file_base
);

// Get the element holder
auto holder_ = findElmHolder<ColT>(proxy_bits);
vtAssert(holder_ != nullptr, "Must have valid holder for collection");

auto range = getRange<ColT>(proxy_bits);

CollectionDirectory<IndexT> directory;

holder_->foreach([&](IndexT const& idx, CollectionBase<ColT,IndexT>* elm) {
auto const name = makeFilename(
range, idx, file_base, make_sub_dirs, files_per_directory
);
auto const bytes = checkpoint::getSize(*static_cast<ColT*>(elm));
directory.elements_.emplace_back(
typename CollectionDirectory<IndexT>::Element{idx, name, bytes}
);

checkpoint::serializeToFile(*static_cast<ColT*>(elm), name);
});

auto const directory_name = makeMetaFilename<IndexT>(file_base, make_sub_dirs);
checkpoint::serializeToFile(directory, directory_name);
}

template <typename ColT>
CollectionManager::CollectionProxyWrapType<ColT>
CollectionManager::restoreFromFile(
typename ColT::IndexType range, std::string const& file_base
) {
using IndexType = typename ColT::IndexType;
using DirectoryType = CollectionDirectory<IndexType>;

auto token = constructInsert<ColT>(range);

auto metadata_file_name = makeMetaFilename<IndexType>(file_base, false);

if (access(metadata_file_name.c_str(), F_OK) == -1) {
// file doesn't exist, try looking in sub-directory
metadata_file_name = makeMetaFilename<IndexType>(file_base, true);
}

if (access(metadata_file_name.c_str(), F_OK) == -1) {
throw std::runtime_error("Collection directory file cannot be found");
}

auto directory = checkpoint::deserializeFromFile<DirectoryType>(
metadata_file_name
);

for (auto&& elm : directory->elements_) {
auto idx = elm.idx_;
auto file_name = elm.file_name_;

if (access(file_name.c_str(), F_OK) == -1) {
auto err = fmt::format(
"Collection element file cannot be found: idx={}, file={}",
idx, file_name
);
throw std::runtime_error(err);
}

// @todo: error check the file read with bytes in directory

auto col_ptr = checkpoint::deserializeFromFile<ColT>(file_name);
token[idx].insert(std::move(*col_ptr));
}

return finishedInsert(std::move(token));
}

}}} /* end namespace vt::vrt::collection */

#endif /*INCLUDED_VRT_COLLECTION_MANAGER_IMPL_H*/
Loading