Skip to content

Commit

Permalink
#476: lb: support same-node "migration"
Browse files Browse the repository at this point in the history
  • Loading branch information
lifflander committed Oct 1, 2019
1 parent 01fd888 commit ca254f5
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 87 deletions.
1 change: 1 addition & 0 deletions src/vt/vrt/collection/holders/holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ struct Holder {
InnerHolder& lookup(IndexT const& idx);
void insert(IndexT const& idx, InnerHolder&& inner);
VirtualPtrType remove(IndexT const& idx);
void replace(IndexT const& idx, VirtualPtrType ptr);
void destroyAll();
bool isDestroyed() const;
void cleanupExists();
Expand Down
11 changes: 11 additions & 0 deletions src/vt/vrt/collection/holders/holder.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ typename Holder<ColT, IndexT>::InnerHolder& Holder<ColT, IndexT>::lookup(
return iter->second;
}

template <typename ColT, typename IndexT>
void Holder<ColT, IndexT>::replace(IndexT const& idx, VirtualPtrType ptr) {
auto const& lookup = idx;
auto& container = vc_container_;
auto iter = container.find(lookup);
vtAssert(
iter != container.end(), "Entry must exist in holder when removing entry"
);
iter->second.vc_ptr_ = std::move(ptr);
}

template <typename ColT, typename IndexT>
typename Holder<ColT, IndexT>::VirtualPtrType Holder<ColT, IndexT>::remove(
IndexT const& idx
Expand Down
179 changes: 92 additions & 87 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
#include "vt/collective/reduce/reduce_hash.h"
#include "vt/runnable/collection.h"

#if HAS_SERIALIZATION_LIBRARY
#include "serialization_library_headers.h"
#endif

#include <tuple>
#include <utility>
#include <functional>
Expand Down Expand Up @@ -2634,110 +2638,111 @@ template <typename ColT, typename IndexT>
MigrateStatus CollectionManager::migrateOut(
VirtualProxyType const& col_proxy, IndexT const& idx, NodeType const& dest
) {
auto const& this_node = theContext()->getNode();
auto const& this_node = theContext()->getNode();

debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, this_node={}, dest={}, "
"idx={}\n",
col_proxy, this_node, dest, print_index(idx)
);
debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, this_node={}, dest={}, "
"idx={}\n",
col_proxy, this_node, dest, print_index(idx)
);

if (this_node != dest) {
auto const& proxy = CollectionProxy<ColT, IndexT>(col_proxy).operator()(
idx
);
auto elm_holder = findElmHolder<ColT, IndexT>(col_proxy);
vtAssert(
elm_holder != nullptr, "Element must be registered here"
);
auto const& proxy = CollectionProxy<ColT, IndexT>(col_proxy).operator()(idx);
auto elm_holder = findElmHolder<ColT, IndexT>(col_proxy);
vtAssert(
elm_holder != nullptr, "Element must be registered here"
);

#if backend_check_enabled(runtime_checks)
{
bool const exists = elm_holder->exists(idx);
vtAssert(
exists, "Local element must exist here for migration to occur"
);
}
#endif
#if backend_check_enabled(runtime_checks)
{
bool const exists = elm_holder->exists(idx);
vtAssert(
exists, "Local element must exist here for migration to occur"
);
}
#endif

bool const exists = elm_holder->exists(idx);
if (!exists) {
return MigrateStatus::ElementNotLocal;
}
bool const exists = elm_holder->exists(idx);
if (!exists) {
return MigrateStatus::ElementNotLocal;
}

debug_print(
vrt_coll, node,
"migrateOut: (before remove) holder numElements={}\n",
elm_holder->numElements()
);
debug_print(
vrt_coll, node,
"migrateOut: (before remove) holder numElements={}\n",
elm_holder->numElements()
);

auto& coll_elm_info = elm_holder->lookup(idx);
auto map_fn = coll_elm_info.map_fn;
auto range = coll_elm_info.max_idx;
auto col_unique_ptr = elm_holder->remove(idx);
auto& typed_col_ref = *static_cast<ColT*>(col_unique_ptr.get());
if (this_node == dest) {
auto& inner = elm_holder->lookup(idx);
auto ptr = inner.getCollection();

debug_print(
vrt_coll, node,
"migrateOut: (after remove) holder numElements={}\n",
elm_holder->numElements()
);
std::unique_ptr<ColT> new_ptr = nullptr;
serialize(*ptr, [&](SizeType) -> SerialByteType* {
new_ptr = std::make_unique<ColT>();
return reinterpret_cast<SerialByteType*>(new_ptr.get());
});

/*
* Invoke the virtual prelude migrate out function
*/
col_unique_ptr->preMigrateOut();
elm_holder->replace(idx, std::move(new_ptr));
return MigrateStatus::NoMigrationNecessary;
} else {
auto& coll_elm_info = elm_holder->lookup(idx);
auto map_fn = coll_elm_info.map_fn;
auto range = coll_elm_info.max_idx;
auto col_unique_ptr = elm_holder->remove(idx);
auto& typed_col_ref = *static_cast<ColT*>(col_unique_ptr.get());

debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, idx={}, dest={}: serializing collection elm\n",
col_proxy, print_index(idx), dest
);
debug_print(
vrt_coll, node,
"migrateOut: (after remove) holder numElements={}\n",
elm_holder->numElements()
);

using MigrateMsgType = MigrateMsg<ColT, IndexT>;
/*
* Invoke the virtual prelude migrate out function
*/
col_unique_ptr->preMigrateOut();

auto msg = makeSharedMessage<MigrateMsgType>(
proxy, this_node, dest, map_fn, range, &typed_col_ref
);
debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, idx={}, dest={}: serializing collection elm\n",
col_proxy, print_index(idx), dest
);

theMsg()->sendMsgAuto<
MigrateMsgType, MigrateHandlers::migrateInHandler<ColT, IndexT>
>(dest, msg);
using MigrateMsgType = MigrateMsg<ColT, IndexT>;

theLocMan()->getCollectionLM<ColT, IndexT>(col_proxy)->entityMigrated(
proxy, dest
);
auto msg = makeSharedMessage<MigrateMsgType>(
proxy, this_node, dest, map_fn, range, &typed_col_ref
);

/*
* Invoke the virtual epilog migrate out function
*/
col_unique_ptr->epiMigrateOut();
theMsg()->sendMsgAuto<
MigrateMsgType, MigrateHandlers::migrateInHandler<ColT, IndexT>
>(dest, msg);

debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, idx={}, dest={}: invoking destroy()\n",
col_proxy, print_index(idx), dest
);
theLocMan()->getCollectionLM<ColT, IndexT>(col_proxy)->entityMigrated(
proxy, dest
);

/*
* Invoke the virtual destroy function and then null std::unique_ptr<ColT>,
* which should cause the destructor to fire
*/
col_unique_ptr->destroy();
col_unique_ptr = nullptr;
/*
* Invoke the virtual epilog migrate out function
*/
col_unique_ptr->epiMigrateOut();

return MigrateStatus::MigratedToRemote;
} else {
#if backend_check_enabled(runtime_checks)
vtAssert(
false, "Migration should only be called when to_node is != this_node"
);
#else
// Do nothing
#endif
return MigrateStatus::NoMigrationNecessary;
}
debug_print(
vrt_coll, node,
"migrateOut: col_proxy={:x}, idx={}, dest={}: invoking destroy()\n",
col_proxy, print_index(idx), dest
);

/*
* Invoke the virtual destroy function and then null std::unique_ptr<ColT>,
* which should cause the destructor to fire
*/
col_unique_ptr->destroy();
col_unique_ptr = nullptr;

return MigrateStatus::MigratedToRemote;
}
}

template <typename ColT, typename IndexT>
Expand Down

0 comments on commit ca254f5

Please sign in to comment.