Skip to content

Commit

Permalink
#476 add runtime flag that allows LB to migrate objs to the same node
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Strzebonski authored and nlslatt committed May 25, 2022
1 parent 18da9ac commit 5fd8f5e
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 121 deletions.
6 changes: 4 additions & 2 deletions docs/md/lb-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ To enable load balancing, the cmake flag \code{.cmake} -Dvt_lb_enabled=1
instrumentation of work and communication performed by collection elements.

To run a load balancer at runtime:
- Pass `--vt_lb --vt_lb_name=<LB>` as a command line argument
- Write a LB specification file `--vt_lb --vt_lb_file_name=<FILE>`

- Pass `--vt_lb --vt_lb_name=<LB>` as a command line argument
- Write a LB specification file `--vt_lb --vt_lb_file_name=<FILE>`
- One can also pass `--vt_lb_self_migration` as a command line argument to allow load balancer to migrate objects to the same node

\section lb-specification-file LB Specification File

Expand Down
1 change: 1 addition & 0 deletions src/vt/collective/collective_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ void printOverwrittens(
printIfOverwritten(vt_lb_data_file);
printIfOverwritten(vt_lb_data_dir_in);
printIfOverwritten(vt_lb_data_file_in);
printIfOverwritten(vt_lb_self_migration);
printIfOverwritten(vt_help_lb_args);
printIfOverwritten(vt_no_detect_hang);
printIfOverwritten(vt_print_no_progress);
Expand Down
24 changes: 13 additions & 11 deletions src/vt/configs/arguments/app_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,22 @@ struct AppConfig {
bool vt_trace_event_polling = false;
bool vt_trace_irecv_polling = false;

bool vt_lb = false;
bool vt_lb_show_spec = false;
bool vt_lb_quiet = false;
std::string vt_lb_file_name = "";
std::string vt_lb_name = "NoLB";
std::string vt_lb_args = "";
int32_t vt_lb_interval = 1;
bool vt_lb_keep_last_elm = false;
bool vt_lb_data = false;
bool vt_lb_data_compress = true;
bool vt_lb = false;
bool vt_lb_show_spec = false;
bool vt_lb_quiet = false;
std::string vt_lb_file_name = "";
std::string vt_lb_name = "NoLB";
std::string vt_lb_args = "";
int32_t vt_lb_interval = 1;
bool vt_lb_keep_last_elm = false;
bool vt_lb_data = false;
bool vt_lb_data_compress = true;
std::string vt_lb_data_dir = "vt_lb_data";
std::string vt_lb_data_file = "data.%p.json";
std::string vt_lb_data_dir_in = "vt_lb_data_in";
std::string vt_lb_data_file_in = "data.%p.json";
bool vt_help_lb_args = false;
bool vt_help_lb_args = false;
bool vt_lb_self_migration = false;

bool vt_no_detect_hang = false;
bool vt_print_no_progress = true;
Expand Down Expand Up @@ -314,6 +315,7 @@ struct AppConfig {
| vt_lb_data_dir_in
| vt_lb_data_file_in
| vt_help_lb_args
| vt_lb_self_migration

| vt_no_detect_hang
| vt_print_no_progress
Expand Down
32 changes: 17 additions & 15 deletions src/vt/configs/arguments/args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,27 +475,28 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) {
auto lb_data_file = "Load balancing data output file name";
auto lb_data_dir_in = "Load balancing data input directory";
auto lb_data_file_in = "Load balancing data input file name";
auto lb_self_migration = "Allow load balancer to migrate objects to the same node";
auto lbn = "NoLB";
auto lbi = 1;
auto lbf = "";
auto lbd = "vt_lb_data";
auto lbs = "data";
auto lba = "";
auto s = app.add_flag("--vt_lb", appConfig.vt_lb, lb);
auto t1 = app.add_flag("--vt_lb_quiet", appConfig.vt_lb_quiet, lb_quiet);
auto u = app.add_option("--vt_lb_file_name", appConfig.vt_lb_file_name, lb_file_name, lbf)
->check(CLI::ExistingFile);
auto u1 = app.add_flag("--vt_lb_show_spec", appConfig.vt_lb_show_spec, lb_show_spec);
auto v = app.add_option("--vt_lb_name", appConfig.vt_lb_name, lb_name, lbn);
auto v1 = app.add_option("--vt_lb_args", appConfig.vt_lb_args, lb_args, lba);
auto w = app.add_option("--vt_lb_interval", appConfig.vt_lb_interval, lb_interval, lbi);
auto wl = app.add_flag("--vt_lb_keep_last_elm", appConfig.vt_lb_keep_last_elm, lb_keep_last_elm);
auto ww = app.add_flag("--vt_lb_data", appConfig.vt_lb_data, lb_data);
auto xz = app.add_flag("--vt_lb_data_compress", appConfig.vt_lb_data_compress, lb_data_comp);
auto wx = app.add_option("--vt_lb_data_dir", appConfig.vt_lb_data_dir, lb_data_dir, lbd);
auto wy = app.add_option("--vt_lb_data_file", appConfig.vt_lb_data_file, lb_data_file,lbs);
auto xx = app.add_option("--vt_lb_data_dir_in", appConfig.vt_lb_data_dir_in, lb_data_dir_in, lbd);
auto xy = app.add_option("--vt_lb_data_file_in", appConfig.vt_lb_data_file_in, lb_data_file_in,lbs);
auto s = app.add_flag("--vt_lb", appConfig.vt_lb, lb);
auto t1 = app.add_flag("--vt_lb_quiet", appConfig.vt_lb_quiet, lb_quiet);
auto u = app.add_option("--vt_lb_file_name", appConfig.vt_lb_file_name, lb_file_name, lbf)->check(CLI::ExistingFile);
auto u1 = app.add_flag("--vt_lb_show_spec", appConfig.vt_lb_show_spec, lb_show_spec);
auto v = app.add_option("--vt_lb_name", appConfig.vt_lb_name, lb_name, lbn);
auto v1 = app.add_option("--vt_lb_args", appConfig.vt_lb_args, lb_args, lba);
auto w = app.add_option("--vt_lb_interval", appConfig.vt_lb_interval, lb_interval, lbi);
auto wl = app.add_flag("--vt_lb_keep_last_elm", appConfig.vt_lb_keep_last_elm, lb_keep_last_elm);
auto ww = app.add_flag("--vt_lb_data", appConfig.vt_lb_data, lb_data);
auto xz = app.add_flag("--vt_lb_data_compress", appConfig.vt_lb_data_compress, lb_data_comp);
auto wx = app.add_option("--vt_lb_data_dir", appConfig.vt_lb_data_dir, lb_data_dir, lbd);
auto wy = app.add_option("--vt_lb_data_file", appConfig.vt_lb_data_file, lb_data_file,lbs);
auto xx = app.add_option("--vt_lb_data_dir_in", appConfig.vt_lb_data_dir_in, lb_data_dir_in, lbd);
auto xy = app.add_option("--vt_lb_data_file_in", appConfig.vt_lb_data_file_in, lb_data_file_in, lbs);
auto lbasm = app.add_option("--vt_lb_self_migration", appConfig.vt_lb_self_migration, lb_self_migration);

auto debugLB = "Load Balancing";
s->group(debugLB);
Expand All @@ -512,6 +513,7 @@ void addLbArgs(CLI::App& app, AppConfig& appConfig) {
xx->group(debugLB);
xy->group(debugLB);
xz->group(debugLB);
lbasm->group(debugLB);

// help options deliberately omitted from the debugLB group above so that
// they appear grouped with --vt_help when --vt_help is used
Expand Down
14 changes: 11 additions & 3 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ EventType ActiveMessenger::sendMsgBytesWithPut(
TagType const& send_tag
) {
auto msg = base.get();
auto const& is_term = envelopeIsTerm(msg->env);
auto const& is_put = envelopeIsPut(msg->env);
auto const& is_put_packed = envelopeIsPackedPutType(msg->env);
auto const is_term = envelopeIsTerm(msg->env);
auto const is_put = envelopeIsPut(msg->env);
auto const is_put_packed = envelopeIsPackedPutType(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);

if (!is_term || vt_check_enabled(print_term_msgs)) {
vt_debug_print(
Expand All @@ -229,6 +230,13 @@ EventType ActiveMessenger::sendMsgBytesWithPut(
);
}

vtWarnIf(
dest == theContext()->getNode() &&
not is_bcast &&
not theConfig()->vt_lb_self_migration,
fmt::format("Destination {} should != this node", dest)
);

MsgSizeType new_msg_size = base.size();

if (is_put && !is_put_packed) {
Expand Down
10 changes: 8 additions & 2 deletions src/vt/runtime/runtime_banner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,15 @@ void Runtime::printStartupBanner() {
if (getAppConfig()->vt_lb) {
auto f9 = opt_on("--vt_lb", "Load balancing enabled");
fmt::print("{}\t{}{}", vt_pre, f9, reset);

auto f10 = opt_on("--vt_lb_self_migration", "Self migration enabled");
fmt::print("{}\t{}{}", vt_pre, f10, reset);

if (getAppConfig()->vt_lb_file_name != "") {
auto f12 = fmt::format("Reading LB specification from file \"{}\"",
getAppConfig()->vt_lb_file_name);
auto f12 = fmt::format(
"Reading LB specification from file \"{}\"",
getAppConfig()->vt_lb_file_name
);
auto f11 = opt_on("--vt_lb_file_name", f12);
fmt::print("{}\t{}{}", vt_pre, f11, reset);

Expand Down
12 changes: 2 additions & 10 deletions src/vt/vrt/collection/balance/baselb/baselb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,11 @@ void BaseLB::notifyNewHostNodeOfObjectsArriving(
}

void BaseLB::migrateObjectTo(ObjIDType const obj_id, NodeType const to) {
if (obj_id.curr_node != to) {
migrateObject(obj_id, to);
if (obj_id.curr_node != to || theConfig()->vt_lb_self_migration) {
transfers_.push_back(TransferDestType{obj_id, to});
}
}

void BaseLB::migrateObjectToSelf(const ObjIDType obj_id) {
migrateObject(obj_id, obj_id.curr_node);
}

void BaseLB::migrateObject(const ObjIDType obj_id, const NodeType to) {
transfers_.push_back(TransferDestType{obj_id, to});
}

void BaseLB::finalize(CountMsg* msg) {
auto global_count = msg->getVal();
if (migration_count_cb_) {
Expand Down
8 changes: 0 additions & 8 deletions src/vt/vrt/collection/balance/baselb/baselb.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,6 @@ struct BaseLB {
void migrationDone();
void migrateObjectTo(ObjIDType const obj_id, NodeType const node);

/*
* This function migrates object from and to the same node. It is used
* by SerdeTestLB for purpose of testing serialization and deserialization.
*/
void migrateObjectToSelf(ObjIDType const obj_id);

void transferSend(NodeType from, TransferVecType const& transfer);
void transferMigrations(TransferMsg<TransferVecType>* msg);
void finalize(CountMsg* msg);
Expand Down Expand Up @@ -168,8 +162,6 @@ struct BaseLB {
*/
std::shared_ptr<const balance::Reassignment> normalizeReassignments();

void migrateObject(ObjIDType const obj_id, NodeType const node);

private:
TransferVecType transfers_ = {};
TransferType off_node_migrate_ = {};
Expand Down
7 changes: 6 additions & 1 deletion src/vt/vrt/collection/balance/serdetestlb/serdetestlb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ SerdeTestLB::getInputKeysWithHelp() {
return std::unordered_map<std::string, std::string>{};
}

void SerdeTestLB::init(objgroup::proxy::Proxy<SerdeTestLB>) { }
void SerdeTestLB::init(objgroup::proxy::Proxy<SerdeTestLB>) {
vtAssert(
theConfig()->vt_lb_self_migration,
"SerdeTestLB::init(): vt_lb_allow_self_migration flag must be set to use SerdeTestLB\n"
);
}

void SerdeTestLB::inputParams(balance::SpecEntry*) { }

Expand Down
146 changes: 77 additions & 69 deletions src/vt/vrt/collection/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1801,96 +1801,104 @@ MigrateStatus CollectionManager::migrateOut(
terse, vrt_coll, "migrating from {} to {}\n", this_node, dest
);

auto const& proxy = CollectionProxy<ColT, IndexT>(col_proxy).operator()(
idx
);
auto elm_holder = findElmHolder<IndexT>(col_proxy);
vtAssert(
elm_holder != nullptr, "Element must be registered here"
);
if (this_node != dest || theConfig()->vt_lb_self_migration) {
auto const& proxy = CollectionProxy<ColT, IndexT>(col_proxy).operator()(
idx
);
auto elm_holder = findElmHolder<IndexT>(col_proxy);
vtAssert(
elm_holder != nullptr, "Element must be registered here"
);

#if vt_check_enabled(runtime_checks)
{
{
bool const exists = elm_holder->exists(idx);
vtAssert(
exists, "Local element must exist here for migration to occur"
);
}
#endif

bool const exists = elm_holder->exists(idx);
vtAssert(
exists, "Local element must exist here for migration to occur"
if (!exists) {
return MigrateStatus::ElementNotLocal;
}

vt_debug_print(
verbose, vrt_coll,
"migrateOut: (before remove) holder numElements={}\n",
elm_holder->numElements()
);
}
#endif

bool const exists = elm_holder->exists(idx);
if (!exists) {
return MigrateStatus::ElementNotLocal;
}
if (elm_holder->numElements() == 1 and theConfig()->vt_lb_keep_last_elm) {
vt_debug_print(
normal, vrt_coll,
"migrateOut: do not migrate last element\n"
);
return MigrateStatus::ElementNotLocal;
}

vt_debug_print(
verbose, vrt_coll,
"migrateOut: (before remove) holder numElements={}\n",
elm_holder->numElements()
);
auto col_unique_ptr = elm_holder->remove(idx);
auto& typed_col_ref = *static_cast<ColT*>(col_unique_ptr.get());

if (elm_holder->numElements() == 1 and theConfig()->vt_lb_keep_last_elm) {
vt_debug_print(
normal, vrt_coll,
"migrateOut: do not migrate last element\n"
verbose, vrt_coll,
"migrateOut: (after remove) holder numElements={}\n",
elm_holder->numElements()
);
return MigrateStatus::ElementNotLocal;
}

auto col_unique_ptr = elm_holder->remove(idx);
auto& typed_col_ref = *static_cast<ColT*>(col_unique_ptr.get());
/*
* Invoke the virtual prelude migrate out function
*/
col_unique_ptr->preMigrateOut();

vt_debug_print(
verbose, vrt_coll,
"migrateOut: (after remove) holder numElements={}\n",
elm_holder->numElements()
);
vt_debug_print(
verbose, vrt_coll,
"migrateOut: col_proxy={:x}, idx={}, dest={}: serializing collection elm\n",
col_proxy, print_index(idx), dest
);

/*
* Invoke the virtual prelude migrate out function
*/
col_unique_ptr->preMigrateOut();
using MigrateMsgType = MigrateMsg<ColT, IndexT>;

vt_debug_print(
verbose, vrt_coll,
"migrateOut: col_proxy={:x}, idx={}, dest={}: serializing collection elm\n",
col_proxy, print_index(idx), dest
);
auto msg = makeMessage<MigrateMsgType>(proxy, this_node, dest, &typed_col_ref);

using MigrateMsgType = MigrateMsg<ColT, IndexT>;
theMsg()->sendMsg<
MigrateMsgType, MigrateHandlers::migrateInHandler<ColT, IndexT>
>(dest, msg);

auto msg = makeMessage<MigrateMsgType>(proxy, this_node, dest, &typed_col_ref);
theLocMan()->getCollectionLM<IndexT>(col_proxy)->entityEmigrated(idx, dest);

theMsg()->sendMsg<
MigrateMsgType, MigrateHandlers::migrateInHandler<ColT, IndexT>
>(dest, msg);
/*
* Invoke the virtual epilog migrate out function
*/
col_unique_ptr->epiMigrateOut();

theLocMan()->getCollectionLM<IndexT>(col_proxy)->entityEmigrated(idx, dest);
vt_debug_print(
verbose, vrt_coll,
"migrateOut: col_proxy={:x}, idx={}, dest={}: invoking destroy()\n",
col_proxy, print_index(idx), dest
);

/*
* Invoke the virtual epilog migrate out function
*/
col_unique_ptr->epiMigrateOut();
/*
* Invoke the virtual destroy function and then null std::unique_ptr<ColT>,
* which should cause the destructor to fire
*/
col_unique_ptr->destroy();
col_unique_ptr = nullptr;

vt_debug_print(
verbose, vrt_coll,
"migrateOut: col_proxy={:x}, idx={}, dest={}: invoking destroy()\n",
col_proxy, print_index(idx), dest
);
auto const home_node = getMappedNode<IndexT>(col_proxy, idx);
elm_holder->applyListeners(
listener::ElementEventEnum::ElementMigratedOut, idx, home_node
);

/*
* Invoke the virtual destroy function and then null std::unique_ptr<ColT>,
* which should cause the destructor to fire
*/
col_unique_ptr->destroy();
col_unique_ptr = nullptr;

auto const home_node = getMappedNode<IndexT>(col_proxy, idx);
elm_holder->applyListeners(
listener::ElementEventEnum::ElementMigratedOut, idx, home_node
);
return MigrateStatus::MigratedToRemote;
}

#if vt_check_enabled(runtime_checks)
vtAssert(false, "Migration should only be called when to_node is != this_node");
#endif

return MigrateStatus::MigratedToRemote;
return MigrateStatus::NoMigrationNecessary;
}

template <typename ColT, typename IndexT>
Expand Down

0 comments on commit 5fd8f5e

Please sign in to comment.