From 34296377e7fdd162abfa9eb8809c8925e771362b Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 8 Feb 2021 12:01:06 -0800 Subject: [PATCH 01/30] #1279: gossiplb: Use best of multiple trials --- .../collection/balance/gossiplb/gossiplb.cc | 223 ++++++++++++++---- .../collection/balance/gossiplb/gossiplb.h | 32 ++- 2 files changed, 213 insertions(+), 42 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index eb247327c8..c392a10b5f 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -74,15 +74,16 @@ bool GossipLB::isOverloaded(LoadType load) const { } void GossipLB::inputParams(balance::SpecEntry* spec) { - std::vector allowed{"f", "k", "i", "c"}; + std::vector allowed{"f", "k", "i", "c", "trials"}; spec->checkAllowedKeys(allowed); using CriterionEnumUnder = typename std::underlying_type::type; auto default_c = static_cast(criterion_); - f_ = spec->getOrDefault("f", f_); - k_max_ = spec->getOrDefault("k", k_max_); - num_iters_ = spec->getOrDefault("i", num_iters_); - int32_t c = spec->getOrDefault("c", default_c); - criterion_ = static_cast(c); + f_ = spec->getOrDefault("f", f_); + k_max_ = spec->getOrDefault("k", k_max_); + num_iters_ = spec->getOrDefault("i", num_iters_); + num_trials_ = spec->getOrDefault("trials", num_trials_); + int32_t c = spec->getOrDefault("c", default_c); + criterion_ = static_cast(c); } void GossipLB::runLB() { @@ -90,6 +91,7 @@ void GossipLB::runLB() { auto const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); auto const max = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::max); + auto const imb = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::imb); auto const load = this_load; if (avg > 0.0000000001) { @@ -106,59 +108,185 @@ void GossipLB::runLB() { } if (should_lb) { - doLBStages(); + doLBStages(imb); } } -void GossipLB::doLBStages() { - for (iter_ = 0; iter_ < num_iters_; iter_++) { - bool first_iter = iter_ == 0; +void GossipLB::doLBStages(TimeType start_imb) { + std::unordered_map best_objs; + LoadType best_load; + TimeType best_imb = start_imb+1; + uint16_t best_trial = 0; - vt_debug_print( - terse, gossiplb, - "GossipLB::doLBStages: (before) running iter_={}, num_iters_={}, load={}, new_load={}\n", - iter_, num_iters_, this_load, this_new_load_ - ); + auto this_node = theContext()->getNode(); - if (first_iter) { - // Copy this node's object assignments to a local, mutable copy - cur_objs_.clear(); - for (auto obj : *load_model_) - cur_objs_[obj] = load_model_->getWork(obj, {balance::PhaseOffset::NEXT_PHASE, balance::PhaseOffset::WHOLE_PHASE}); - this_new_load_ = this_load; - } else { - // Clear out data structures from previous iteration - selected_.clear(); - underloaded_.clear(); - load_info_.clear(); - k_cur_ = 0; - is_overloaded_ = is_underloaded_ = false; + for (uint16_t trial = 0; trial < num_trials_; ++trial) { + // Clear out data structures + selected_.clear(); + underloaded_.clear(); + load_info_.clear(); + k_cur_ = 0; + is_overloaded_ = is_underloaded_ = false; + + for (iter_ = 0; iter_ < num_iters_; iter_++) { + bool first_iter = iter_ == 0; + + vt_debug_print( + normal, gossiplb, + "GossipLB::doLBStages: (before) running iter_={}, num_iters_={}, load={}, new_load={}\n", + iter_, num_iters_, this_load, this_new_load_ + ); + + if (first_iter) { + // Copy this node's object assignments to a local, mutable copy + cur_objs_.clear(); + for (auto obj : *load_model_) + cur_objs_[obj] = load_model_->getWork(obj, {balance::PhaseOffset::NEXT_PHASE, balance::PhaseOffset::WHOLE_PHASE}); + this_new_load_ = this_load; + } else { + // Clear out data structures from previous iteration + selected_.clear(); + underloaded_.clear(); + load_info_.clear(); + k_cur_ = 0; + is_overloaded_ = is_underloaded_ = false; + } + + if (isOverloaded(this_new_load_)) { + is_overloaded_ = true; + } else if (isUnderloaded(this_new_load_)) { + is_underloaded_ = true; + } + + inform(); + decide(); + + vt_debug_print( + normal, gossiplb, + "GossipLB::doLBStages: (after) running iter_={}, num_iters_={}, load={}, new_load={}\n", + iter_, num_iters_, this_load, this_new_load_ + ); + + runInEpochCollective([=] { + using StatsMsgType = balance::NodeStatsMsg; + using ReduceOp = collective::PlusOp; + auto cb = vt::theCB()->makeBcast< + GossipLB, StatsMsgType, &GossipLB::gossipStatsHandler + >(this->proxy_); + // Perform the reduction for P_l -> processor load only + auto msg = makeMessage(Statistic::P_l, this_new_load_); + this->proxy_.template reduce(msg,cb); + }); + + if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::doLBStages: trial={} iter={} imb={:0.2f}\n", + trial, iter_, new_imbalance_ + ); + } } - if (isOverloaded(this_new_load_)) { - is_overloaded_ = true; - } else if (isUnderloaded(this_new_load_)) { - is_underloaded_ = true; + if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::doLBStages: trial={} imb={:0.2f}\n", + trial, new_imbalance_ + ); } - inform(); - decide(); + if (cur_objs_.size() == 0) { + vt_print( + gossiplb, + "GossipLB::doLBStages: trial={} local_objs={}\n", + trial, cur_objs_.size() + ); + } - vt_debug_print( - normal, gossiplb, - "GossipLB::doLBStages: (after) running iter_={}, num_iters_={}, load={}, new_load={}\n", - iter_, num_iters_, this_load, this_new_load_ - ); + if (new_imbalance_ <= start_imb && new_imbalance_ < best_imb) { + best_load = this_new_load_; + best_objs = cur_objs_; + best_imb = new_imbalance_; + best_trial = trial; + } + + // Clear out for next try or for not migrating by default + cur_objs_.clear(); + this_new_load_ = this_load; } - // Update the load based on new object assignments - this_load = this_new_load_; + if (best_imb <= start_imb) { + cur_objs_ = best_objs; + this_load = this_new_load_ = best_load; + new_imbalance_ = best_imb; + + // Update the load based on new object assignments + if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::doLBStages: chose trial={} with imb={:0.2f}\n", + best_trial, new_imbalance_ + ); + } + } else if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::doLBStages: rejected all trials because they would increase imbalance\n" + ); + } // Concretize lazy migrations by invoking the BaseLB object migration on new // object node assignments thunkMigrations(); } +void GossipLB::gossipStatsHandler(StatsMsgType* msg) { + auto in = msg->getConstVal(); + new_imbalance_ = in.I(); + + auto this_node = theContext()->getNode(); + if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::gossipStatsHandler: max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.2f}\n", + in.max(), in.min(), in.avg(), in.I() + ); + } +/* + if (this_new_load_ <= in.min() * 1.01) { + vt_print( + gossiplb, + "GossipLB::gossipStatsHandler: new_load={:0.2f} min={:0.2f} count={}\n", + this_new_load_, in.min(), cur_objs_.size() + ); + } + if (this_new_load_ >= in.max() * 0.99) { + vt_print( + gossiplb, + "GossipLB::gossipStatsHandler: new_load={:0.2f} max={:0.2f} count={}\n", + this_new_load_, in.max(), cur_objs_.size() + ); + } +*/ +} + +void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) { + auto in = msg->getConstVal(); + + auto n_rejected = in.n_rejected_; + auto n_transfers = in.n_transfers_; + double rej = static_cast(n_rejected) / static_cast(n_rejected + n_transfers) * 100.0; + + auto this_node = theContext()->getNode(); + if (this_node == 0) { + vt_print( + gossiplb, + "GossipLB::gossipRejectionStatsHandler: n_transfers={} n_rejected={} rejection_rate={:0.1f}%\n", + n_transfers, n_rejected, rej + ); + } +} + void GossipLB::inform() { vt_debug_print( normal, gossiplb, @@ -378,6 +506,8 @@ void GossipLB::decide() { auto lazy_epoch = theTerm()->makeEpochCollective("GossipLB: decide"); + int n_transfers = 0, n_rejected = 0; + if (is_overloaded_) { std::vector under = makeUnderloaded(); std::unordered_map migrate_objs; @@ -434,6 +564,7 @@ void GossipLB::decide() { ); if (eval) { + ++n_transfers; migrate_objs[selected_node][obj_id] = obj_load; this_new_load_ -= obj_load; @@ -441,6 +572,7 @@ void GossipLB::decide() { iter = cur_objs_.erase(iter); } else { + ++n_rejected; iter++; } @@ -464,6 +596,15 @@ void GossipLB::decide() { theTerm()->finishedEpoch(lazy_epoch); vt::runSchedulerThrough(lazy_epoch); + + runInEpochCollective([=] { + using ReduceOp = collective::PlusOp; + auto cb = vt::theCB()->makeBcast< + GossipLB, GossipRejectionStatsMsg, &GossipLB::gossipRejectionStatsHandler + >(this->proxy_); + auto msg = makeMessage(n_rejected, n_transfers); + this->proxy_.template reduce(msg,cb); + }); } void GossipLB::thunkMigrations() { diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 48d0393a57..9d81c3c000 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -57,6 +57,32 @@ namespace vt { namespace vrt { namespace collection { namespace lb { +struct RejectionStats { + RejectionStats() = default; + RejectionStats(int n_rejected, int n_transfers) + : n_rejected_(n_rejected), n_transfers_(n_transfers) { } + + friend RejectionStats operator+(RejectionStats a1, RejectionStats const& a2) { + a1.n_rejected_ += a2.n_rejected_; + a1.n_transfers_ += a2.n_transfers_; + + return a1; + } + + int n_rejected_ = 0; + int n_transfers_ = 0; +}; + +struct GossipRejectionStatsMsg : collective::ReduceTMsg { + GossipRejectionStatsMsg() = default; + GossipRejectionStatsMsg(int n_rejected, int n_transfers) + : ReduceTMsg(RejectionStats(n_rejected, n_transfers)) + { } + GossipRejectionStatsMsg(RejectionStats&& rs) + : ReduceTMsg(std::move(rs)) + { } +}; + struct GossipLB : BaseLB { using GossipMsg = balance::GossipMsg; using NodeSetType = std::vector; @@ -74,7 +100,7 @@ struct GossipLB : BaseLB { void inputParams(balance::SpecEntry* spec) override; protected: - void doLBStages(); + void doLBStages(TimeType start_imb); void inform(); void decide(); void migrate(); @@ -94,6 +120,8 @@ struct GossipLB : BaseLB { void lazyMigrateObjsTo(EpochType epoch, NodeType node, ObjsType const& objs); void inLazyMigrations(balance::LazyMigrationMsg* msg); + void gossipStatsHandler(StatsMsgType* msg); + void gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg); void thunkMigrations(); void setupDone(ReduceMsgType* msg); @@ -104,6 +132,7 @@ struct GossipLB : BaseLB { uint8_t k_cur_ = 0; uint16_t iter_ = 0; uint16_t num_iters_ = 4; + uint16_t num_trials_ = 3; std::random_device seed_; std::unordered_map load_info_ = {}; objgroup::proxy::Proxy proxy_ = {}; @@ -113,6 +142,7 @@ struct GossipLB : BaseLB { std::unordered_set underloaded_ = {}; std::unordered_map cur_objs_ = {}; LoadType this_new_load_ = 0.0; + TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; bool setup_done_ = false; }; From 23945c5b4324aeae75831f35d14cad59adcb6b0c Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 22 Feb 2021 15:19:46 -0800 Subject: [PATCH 02/30] #1279: gossiplb: select unique nodes --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index c392a10b5f..0e9b03e84c 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -352,6 +352,9 @@ void GossipLB::propagateRound(EpochType epoch) { auto& selected = selected_; selected = underloaded_; + if (selected.find(this_node) == selected.end()) { + selected.insert(this_node); + } auto const fanout = std::min(f_, static_cast(num_nodes - 1)); @@ -363,7 +366,7 @@ void GossipLB::propagateRound(EpochType epoch) { for (int i = 0; i < fanout; i++) { // This implies full knowledge of all processors - if (selected.size() >= static_cast(num_nodes - 1)) { + if (selected.size() >= static_cast(num_nodes)) { return; } @@ -374,9 +377,9 @@ void GossipLB::propagateRound(EpochType epoch) { do { random_node = dist(gen); } while ( - selected.find(random_node) != selected.end() or - random_node == this_node + selected.find(random_node) != selected.end() ); + selected.insert(random_node); vt_debug_print( verbose, gossiplb, From b749bddb577da4a04c5846f674073360cecd125f Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 22 Feb 2021 15:20:11 -0800 Subject: [PATCH 03/30] #1279: gossiplb: pass load instead of rank --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 0e9b03e84c..aa1344921a 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -411,7 +411,7 @@ void GossipLB::propagateIncoming(GossipMsg* msg) { if (load_info_.find(elm.first) == load_info_.end()) { load_info_[elm.first] = elm.second; - if (isUnderloaded(elm.first)) { + if (isUnderloaded(elm.second)) { underloaded_.insert(elm.first); } } @@ -479,7 +479,7 @@ NodeType GossipLB::sampleFromCMF( std::vector GossipLB::makeUnderloaded() const { std::vector under = {}; for (auto&& elm : load_info_) { - if (isUnderloaded(elm.first)) { + if (isUnderloaded(elm.second)) { under.push_back(elm.first); } } From 5489aabb2fb4c6323c430a126efe990a201e94d1 Mon Sep 17 00:00:00 2001 From: Jonathan Lifflander Date: Mon, 8 Feb 2021 12:07:05 -0800 Subject: [PATCH 04/30] #1279: gossiplb: init variable --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index aa1344921a..a0c748f061 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -114,7 +114,7 @@ void GossipLB::runLB() { void GossipLB::doLBStages(TimeType start_imb) { std::unordered_map best_objs; - LoadType best_load; + LoadType best_load = 0; TimeType best_imb = start_imb+1; uint16_t best_trial = 0; From 52decfaf958f749aa8dbcb7ff59e6862cfb5d818 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 1 Mar 2021 16:29:21 -0800 Subject: [PATCH 05/30] #1279: gossiplb: improve debugging output --- .../collection/balance/gossiplb/gossiplb.cc | 95 ++++++++----------- .../collection/balance/gossiplb/gossiplb.h | 1 + 2 files changed, 41 insertions(+), 55 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index a0c748f061..0be3096d57 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -120,7 +120,7 @@ void GossipLB::doLBStages(TimeType start_imb) { auto this_node = theContext()->getNode(); - for (uint16_t trial = 0; trial < num_trials_; ++trial) { + for (trial_ = 0; trial_ < num_trials_; ++trial_) { // Clear out data structures selected_.clear(); underloaded_.clear(); @@ -133,8 +133,9 @@ void GossipLB::doLBStages(TimeType start_imb) { vt_debug_print( normal, gossiplb, - "GossipLB::doLBStages: (before) running iter_={}, num_iters_={}, load={}, new_load={}\n", - iter_, num_iters_, this_load, this_new_load_ + "GossipLB::doLBStages: (before) running trial={}, iter={}, " + "num_iters={}, load={}, new_load={}\n", + trial_, iter_, num_iters_, this_load, this_new_load_ ); if (first_iter) { @@ -163,8 +164,9 @@ void GossipLB::doLBStages(TimeType start_imb) { vt_debug_print( normal, gossiplb, - "GossipLB::doLBStages: (after) running iter_={}, num_iters_={}, load={}, new_load={}\n", - iter_, num_iters_, this_load, this_new_load_ + "GossipLB::doLBStages: (after) running trial={}, iter={}, " + "num_iters={}, load={}, new_load={}\n", + trial_, iter_, num_iters_, this_load, this_new_load_ ); runInEpochCollective([=] { @@ -182,7 +184,7 @@ void GossipLB::doLBStages(TimeType start_imb) { vt_print( gossiplb, "GossipLB::doLBStages: trial={} iter={} imb={:0.2f}\n", - trial, iter_, new_imbalance_ + trial_, iter_, new_imbalance_ ); } } @@ -191,15 +193,7 @@ void GossipLB::doLBStages(TimeType start_imb) { vt_print( gossiplb, "GossipLB::doLBStages: trial={} imb={:0.2f}\n", - trial, new_imbalance_ - ); - } - - if (cur_objs_.size() == 0) { - vt_print( - gossiplb, - "GossipLB::doLBStages: trial={} local_objs={}\n", - trial, cur_objs_.size() + trial_, new_imbalance_ ); } @@ -207,7 +201,7 @@ void GossipLB::doLBStages(TimeType start_imb) { best_load = this_new_load_; best_objs = cur_objs_; best_imb = new_imbalance_; - best_trial = trial; + best_trial = trial_; } // Clear out for next try or for not migrating by default @@ -252,22 +246,6 @@ void GossipLB::gossipStatsHandler(StatsMsgType* msg) { in.max(), in.min(), in.avg(), in.I() ); } -/* - if (this_new_load_ <= in.min() * 1.01) { - vt_print( - gossiplb, - "GossipLB::gossipStatsHandler: new_load={:0.2f} min={:0.2f} count={}\n", - this_new_load_, in.min(), cur_objs_.size() - ); - } - if (this_new_load_ >= in.max() * 0.99) { - vt_print( - gossiplb, - "GossipLB::gossipStatsHandler: new_load={:0.2f} max={:0.2f} count={}\n", - this_new_load_, in.max(), cur_objs_.size() - ); - } -*/ } void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) { @@ -290,8 +268,9 @@ void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) { void GossipLB::inform() { vt_debug_print( normal, gossiplb, - "GossipLB::inform: starting inform phase: k_max_={}, k_cur_={}\n", - k_max_, k_cur_ + "GossipLB::inform: starting inform phase: trial={}, iter={}, k_max={}, " + "k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", + trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ ); vtAssert(k_max_ > 0, "Number of rounds (k) must be greater than zero"); @@ -301,13 +280,6 @@ void GossipLB::inform() { underloaded_.insert(this_node); } - vt_debug_print( - verbose, gossiplb, - "GossipLB::inform: starting inform phase: k_max_={}, k_cur_={}, " - "is_underloaded={}, is_overloaded={}, load={}\n", - k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ - ); - setup_done_ = false; auto cb = theCB()->makeBcast(proxy_); @@ -327,10 +299,19 @@ void GossipLB::inform() { vt::runSchedulerThrough(propagate_epoch); + if (is_overloaded_) { + vt_print( + gossiplb, + "GossipLB::inform: trial={}, iter={}, known underloaded={}\n", + trial_, iter_, underloaded_.size() + ); + } + vt_debug_print( verbose, gossiplb, - "GossipLB::inform: finished inform phase: k_max_={}, k_cur_={}\n", - k_max_, k_cur_ + "GossipLB::inform: finished inform phase: trial={}, iter={}, " + "k_max={}, k_cur={}\n", + trial_, iter_, k_max_, k_cur_ ); } @@ -341,8 +322,8 @@ void GossipLB::setupDone(ReduceMsgType* msg) { void GossipLB::propagateRound(EpochType epoch) { vt_debug_print( normal, gossiplb, - "GossipLB::propagateRound: k_max_={}, k_cur_={}\n", - k_max_, k_cur_ + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}\n", + trial_, iter_, k_max_, k_cur_ ); auto const this_node = theContext()->getNode(); @@ -360,8 +341,9 @@ void GossipLB::propagateRound(EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRound: k_max_={}, k_cur_={}, selected.size()={}, fanout={}\n", - k_max_, k_cur_, selected.size(), fanout + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " + "selected.size()={}, fanout={}\n", + trial_, iter_, k_max_, k_cur_, selected.size(), fanout ); for (int i = 0; i < fanout; i++) { @@ -383,8 +365,9 @@ void GossipLB::propagateRound(EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRound: k_max_={}, k_cur_={}, sending={}\n", - k_max_, k_cur_, random_node + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " + "sending={}\n", + trial_, iter_, k_max_, k_cur_, random_node ); // Send message with load @@ -402,9 +385,9 @@ void GossipLB::propagateIncoming(GossipMsg* msg) { vt_debug_print( normal, gossiplb, - "GossipLB::propagateIncoming: k_max_={}, k_cur_={}, from_node={}, " - "load info size={}\n", - k_max_, k_cur_, from_node, msg->getNodeLoad().size() + "GossipLB::propagateIncoming: trial={}, iter={}, k_max={}, k_cur={}, " + "from_node={}, load info size={}\n", + trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeLoad().size() ); for (auto&& elm : msg->getNodeLoad()) { @@ -552,9 +535,11 @@ void GossipLB::decide() { vt_debug_print( verbose, gossiplb, - "GossipLB::decide: under.size()={}, selected_node={}, selected_load={}," - "obj_id={:x}, home={}, obj_load={}, avg={}, this_new_load_={}, " - "criterion={}\n", + "GossipLB::decide: trial={}, iter={}, under.size()={}, " + "selected_node={}, selected_load={:e}, obj_id={:x}, home={}, " + "obj_load={:e}, avg={:e}, this_new_load_={:e}, criterion={}\n", + trial_, + iter_, under.size(), selected_node, selected_load, diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 9d81c3c000..3fab2d3a62 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -131,6 +131,7 @@ struct GossipLB : BaseLB { uint8_t k_max_ = 4; uint8_t k_cur_ = 0; uint16_t iter_ = 0; + uint16_t trial_ = 0; uint16_t num_iters_ = 4; uint16_t num_trials_ = 3; std::random_device seed_; From 82dcc7493db550703eaf56161aa949e73dfbac74 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 1 Mar 2021 16:36:43 -0800 Subject: [PATCH 06/30] #1279: gossiplb: corrected async informs --- .../collection/balance/gossiplb/gossip_msg.h | 25 ++++++++++ .../collection/balance/gossiplb/gossiplb.cc | 50 +++++++++++-------- .../collection/balance/gossiplb/gossiplb.h | 15 +++--- 3 files changed, 62 insertions(+), 28 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h index a44dc93ed4..6fd3a3e13e 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h @@ -85,6 +85,31 @@ struct GossipMsg : vt::Message { NodeLoadType node_load_ = {}; }; +struct GossipMsgAsync : GossipMsg { + using MessageParentType = GossipMsg; + vt_msg_serialize_if_needed_by_parent(); + + GossipMsgAsync() = default; + GossipMsgAsync( + NodeType in_from_node, NodeLoadType const& in_node_load, int round + ) + : GossipMsg(in_from_node, in_node_load), round_(round) + { } + + uint8_t getRound() const { + return round_; + } + + template + void serialize(SerializerT& s) { + MessageParentType::serialize(s); + s | round_; + } + +private: + int round_; +}; + struct LazyMigrationMsg : SerializeRequired< vt::Message, LazyMigrationMsg diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 0be3096d57..7998e30743 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -159,7 +159,7 @@ void GossipLB::doLBStages(TimeType start_imb) { is_underloaded_ = true; } - inform(); + informAsync(); decide(); vt_debug_print( @@ -265,11 +265,14 @@ void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) { } } -void GossipLB::inform() { +void GossipLB::informAsync() { + propagated_k_.assign(k_max_, false); + uint8_t k_cur_async = 0; + vt_debug_print( normal, gossiplb, - "GossipLB::inform: starting inform phase: trial={}, iter={}, k_max={}, " - "k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", + "GossipLB::informAsync: starting inform phase: trial={}, iter={}, " + "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ ); @@ -288,11 +291,11 @@ void GossipLB::inform() { theSched()->runSchedulerWhile([this]{ return not setup_done_; }); - auto propagate_epoch = theTerm()->makeEpochCollective("GossipLB: inform"); + auto propagate_epoch = theTerm()->makeEpochCollective("GossipLB: informAsync"); // Underloaded start the round if (is_underloaded_) { - propagateRound(propagate_epoch); + propagateRoundAsync(k_cur_async, propagate_epoch); } theTerm()->finishedEpoch(propagate_epoch); @@ -302,14 +305,14 @@ void GossipLB::inform() { if (is_overloaded_) { vt_print( gossiplb, - "GossipLB::inform: trial={}, iter={}, known underloaded={}\n", + "GossipLB::informAsync: trial={}, iter={}, known underloaded={}\n", trial_, iter_, underloaded_.size() ); } vt_debug_print( verbose, gossiplb, - "GossipLB::inform: finished inform phase: trial={}, iter={}, " + "GossipLB::informAsync: finished inform phase: trial={}, iter={}, " "k_max={}, k_cur={}\n", trial_, iter_, k_max_, k_cur_ ); @@ -319,10 +322,10 @@ void GossipLB::setupDone(ReduceMsgType* msg) { setup_done_ = true; } -void GossipLB::propagateRound(EpochType epoch) { +void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { vt_debug_print( normal, gossiplb, - "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}\n", + "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, k_cur={}\n", trial_, iter_, k_max_, k_cur_ ); @@ -341,7 +344,7 @@ void GossipLB::propagateRound(EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " + "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, k_cur={}, " "selected.size()={}, fanout={}\n", trial_, iter_, k_max_, k_cur_, selected.size(), fanout ); @@ -365,28 +368,31 @@ void GossipLB::propagateRound(EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " - "sending={}\n", + "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, " + "k_cur={}, sending={}\n", trial_, iter_, k_max_, k_cur_, random_node ); // Send message with load - auto msg = makeMessage(this_node, load_info_); + auto msg = makeMessage(this_node, load_info_, k_cur_async); if (epoch != no_epoch) { envelopeSetEpoch(msg->env, epoch); } msg->addNodeLoad(this_node, this_new_load_); - proxy_[random_node].sendMsg(msg.get()); + proxy_[random_node].sendMsg< + GossipMsgAsync, &GossipLB::propagateIncomingAsync + >(msg.get()); } } -void GossipLB::propagateIncoming(GossipMsg* msg) { +void GossipLB::propagateIncomingAsync(GossipMsgAsync* msg) { auto const from_node = msg->getFromNode(); + auto k_cur_async = msg->getRound(); vt_debug_print( normal, gossiplb, - "GossipLB::propagateIncoming: trial={}, iter={}, k_max={}, k_cur={}, " - "from_node={}, load info size={}\n", + "GossipLB::propagateIncomingAsync: trial={}, iter={}, k_max={}, " + "k_cur={}, from_node={}, load info size={}\n", trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeLoad().size() ); @@ -400,12 +406,14 @@ void GossipLB::propagateIncoming(GossipMsg* msg) { } } - if (k_cur_ == k_max_ - 1) { + if (k_cur_async == k_max_ - 1) { // nothing to do but wait for termination to be detected + } else if (propagated_k_[k_cur_async]) { + // we already propagated this round before receiving this message } else { // send out another round - propagateRound(); - k_cur_++; + propagated_k_[k_cur_async] = true; + propagateRoundAsync(k_cur_async + 1); } } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 3fab2d3a62..a458178552 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -84,10 +84,10 @@ struct GossipRejectionStatsMsg : collective::ReduceTMsg { }; struct GossipLB : BaseLB { - using GossipMsg = balance::GossipMsg; - using NodeSetType = std::vector; - using ObjsType = std::unordered_map; - using ReduceMsgType = vt::collective::ReduceNoneMsg; + using GossipMsgAsync = balance::GossipMsgAsync; + using NodeSetType = std::vector; + using ObjsType = std::unordered_map; + using ReduceMsgType = vt::collective::ReduceNoneMsg; GossipLB() = default; GossipLB(GossipLB const&) = delete; @@ -101,12 +101,12 @@ struct GossipLB : BaseLB { protected: void doLBStages(TimeType start_imb); - void inform(); + void informAsync(); void decide(); void migrate(); - void propagateRound(EpochType epoch = no_epoch); - void propagateIncoming(GossipMsg* msg); + void propagateRoundAsync(uint8_t k_cur_async, EpochType epoch = no_epoch); + void propagateIncomingAsync(GossipMsgAsync* msg); bool isUnderloaded(LoadType load) const; bool isUnderloadedRelaxed(LoadType over, LoadType under) const; bool isOverloaded(LoadType load) const; @@ -146,6 +146,7 @@ struct GossipLB : BaseLB { TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; bool setup_done_ = false; + std::vector propagated_k_; }; }}}} /* end namespace vt::vrt::collection::lb */ From 66092e10ecc227886f7b2d34dc9ed1bbaaf9c7d6 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 1 Mar 2021 17:09:41 -0800 Subject: [PATCH 07/30] #1279: gossiplb: added sync informs that match lbaf --- .../collection/balance/gossiplb/gossiplb.cc | 178 +++++++++++++++++- .../collection/balance/gossiplb/gossiplb.h | 15 ++ 2 files changed, 186 insertions(+), 7 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 7998e30743..714b47bb50 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -77,13 +77,19 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { std::vector allowed{"f", "k", "i", "c", "trials"}; spec->checkAllowedKeys(allowed); using CriterionEnumUnder = typename std::underlying_type::type; + using InformTypeEnumUnder = typename std::underlying_type::type; + auto default_c = static_cast(criterion_); - f_ = spec->getOrDefault("f", f_); - k_max_ = spec->getOrDefault("k", k_max_); - num_iters_ = spec->getOrDefault("i", num_iters_); - num_trials_ = spec->getOrDefault("trials", num_trials_); - int32_t c = spec->getOrDefault("c", default_c); - criterion_ = static_cast(c); + auto default_inform = static_cast(inform_type_); + + f_ = spec->getOrDefault("f", f_); + k_max_ = spec->getOrDefault("k", k_max_); + num_iters_ = spec->getOrDefault("i", num_iters_); + num_trials_ = spec->getOrDefault("trials", num_trials_); + int32_t c = spec->getOrDefault("c", default_c); + criterion_ = static_cast(c); + int32_t inf = spec->getOrDefault("inform", default_inform); + inform_type_ = static_cast(inf); } void GossipLB::runLB() { @@ -159,7 +165,17 @@ void GossipLB::doLBStages(TimeType start_imb) { is_underloaded_ = true; } - informAsync(); + switch (inform_type_) { + case InformTypeEnum::SyncInform: + informSync(); + break; + case InformTypeEnum::AsyncInform: + informAsync(); + break; + default: + vtAbort("GossipLB:: Unsupported inform type"); + } + decide(); vt_debug_print( @@ -318,6 +334,70 @@ void GossipLB::informAsync() { ); } +void GossipLB::informSync() { + vt_debug_print( + normal, gossiplb, + "GossipLB::informSync: starting inform phase: trial={}, iter={}, " + "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", + trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ + ); + + vtAssert(k_max_ > 0, "Number of rounds (k) must be greater than zero"); + + auto const this_node = theContext()->getNode(); + if (is_underloaded_) { + underloaded_.insert(this_node); + } + + auto propagate_this_round = is_underloaded_; + propagate_next_round_ = false; + new_underloaded_ = underloaded_; + new_load_info_ = load_info_; + + setup_done_ = false; + + auto cb = theCB()->makeBcast(proxy_); + auto msg = makeMessage(); + proxy_.reduce(msg.get(), cb); + + theSched()->runSchedulerWhile([this]{ return not setup_done_; }); + + for (; k_cur_ < k_max_; ++k_cur_) { + auto name = fmt::format("GossipLB: informSync k_cur={}", k_cur_); + auto propagate_epoch = theTerm()->makeEpochCollective(name); + + // Underloaded start the first round; ranks that received on some round + // start subsequent rounds + if (propagate_this_round) { + propagateRoundSync(propagate_epoch); + } + + theTerm()->finishedEpoch(propagate_epoch); + + vt::runSchedulerThrough(propagate_epoch); + + propagate_this_round = propagate_next_round_; + propagate_next_round_ = false; + underloaded_ = new_underloaded_; + load_info_ = new_load_info_; + } + + if (is_overloaded_) { + vt_print( + gossiplb, + "GossipLB::informSync: trial={}, iter={}, known underloaded={}\n", + trial_, iter_, underloaded_.size() + ); + } + + vt_debug_print( + verbose, gossiplb, + "GossipLB::informSync: finished inform phase: trial={}, iter={}, " + "k_max={}, k_cur={}\n", + trial_, iter_, k_max_, k_cur_ + ); +} + void GossipLB::setupDone(ReduceMsgType* msg) { setup_done_ = true; } @@ -385,6 +465,66 @@ void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { } } +void GossipLB::propagateRoundSync(EpochType epoch) { + vt_debug_print( + normal, gossiplb, + "GossipLB::propagateRoundSync: trial={}, iter={}, k_max={}, k_cur={}\n", + trial_, iter_, k_max_, k_cur_ + ); + + auto const this_node = theContext()->getNode(); + auto const num_nodes = theContext()->getNumNodes(); + std::uniform_int_distribution dist(0, num_nodes - 1); + std::mt19937 gen(seed_()); + + auto& selected = selected_; + selected = underloaded_; + if (selected.find(this_node) == selected.end()) { + selected.insert(this_node); + } + + auto const fanout = std::min(f_, static_cast(num_nodes - 1)); + + vt_debug_print( + verbose, gossiplb, + "GossipLB::propagateRoundSync: trial={}, iter={}, k_max={}, k_cur={}, " + "selected.size()={}, fanout={}\n", + trial_, iter_, k_max_, k_cur_, selected.size(), fanout + ); + + for (int i = 0; i < fanout; i++) { + // This implies full knowledge of all processors + if (selected.size() >= static_cast(num_nodes)) { + return; + } + + // First, randomly select a node + NodeType random_node = uninitialized_destination; + + // Keep generating until we have a unique node for this round + do { + random_node = dist(gen); + } while ( + selected.find(random_node) != selected.end() + ); + selected.insert(random_node); + + vt_debug_print( + verbose, gossiplb, + "GossipLB::propagateRoundSync: k_max_={}, k_cur_={}, sending={}\n", + k_max_, k_cur_, random_node + ); + + // Send message with load + auto msg = makeMessage(this_node, load_info_); + if (epoch != no_epoch) { + envelopeSetEpoch(msg->env, epoch); + } + msg->addNodeLoad(this_node, this_new_load_); + proxy_[random_node].sendMsg(msg.get()); + } +} + void GossipLB::propagateIncomingAsync(GossipMsgAsync* msg) { auto const from_node = msg->getFromNode(); auto k_cur_async = msg->getRound(); @@ -417,6 +557,30 @@ void GossipLB::propagateIncomingAsync(GossipMsgAsync* msg) { } } +void GossipLB::propagateIncomingSync(GossipMsgSync* msg) { + auto const from_node = msg->getFromNode(); + + // we collected more info that should be propagated on the next round + propagate_next_round_ = true; + + vt_debug_print( + normal, gossiplb, + "GossipLB::propagateIncomingSync: trial={}, iter={}, k_max={}, " + "k_cur={}, from_node={}, load info size={}\n", + trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeLoad().size() + ); + + for (auto&& elm : msg->getNodeLoad()) { + if (new_load_info_.find(elm.first) == new_load_info_.end()) { + new_load_info_[elm.first] = elm.second; + + if (isUnderloaded(elm.second)) { + new_underloaded_.insert(elm.first); + } + } + } +} + std::vector GossipLB::createCMF(NodeSetType const& under) { double const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index a458178552..0df17cd16a 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -83,8 +83,16 @@ struct GossipRejectionStatsMsg : collective::ReduceTMsg { { } }; +enum struct InformTypeEnum : uint8_t { + // synchronized rounds propagate info faster but have sync cost + SyncInform = 0, + // async rounds propagate before round has completed, omitting some info + AsyncInform = 1 +}; + struct GossipLB : BaseLB { using GossipMsgAsync = balance::GossipMsgAsync; + using GossipMsgSync = balance::GossipMsg; using NodeSetType = std::vector; using ObjsType = std::unordered_map; using ReduceMsgType = vt::collective::ReduceNoneMsg; @@ -102,11 +110,14 @@ struct GossipLB : BaseLB { protected: void doLBStages(TimeType start_imb); void informAsync(); + void informSync(); void decide(); void migrate(); void propagateRoundAsync(uint8_t k_cur_async, EpochType epoch = no_epoch); + void propagateRoundSync(EpochType epoch = no_epoch); void propagateIncomingAsync(GossipMsgAsync* msg); + void propagateIncomingSync(GossipMsgSync* msg); bool isUnderloaded(LoadType load) const; bool isUnderloadedRelaxed(LoadType over, LoadType under) const; bool isOverloaded(LoadType load) const; @@ -136,16 +147,20 @@ struct GossipLB : BaseLB { uint16_t num_trials_ = 3; std::random_device seed_; std::unordered_map load_info_ = {}; + std::unordered_map new_load_info_ = {}; objgroup::proxy::Proxy proxy_ = {}; bool is_overloaded_ = false; bool is_underloaded_ = false; std::unordered_set selected_ = {}; std::unordered_set underloaded_ = {}; + std::unordered_set new_underloaded_ = {}; std::unordered_map cur_objs_ = {}; LoadType this_new_load_ = 0.0; TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; + InformTypeEnum inform_type_ = InformTypeEnum::SyncInform; bool setup_done_ = false; + bool propagate_next_round_ = false; std::vector propagated_k_; }; From 5c7db6fabb512c65c4156a3f080f04559bffcc9f Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 1 Mar 2021 17:16:11 -0800 Subject: [PATCH 08/30] #1279: gossiplb: cleanup --- .../collection/balance/gossiplb/gossip_msg.h | 26 ++++++++++++++++ .../collection/balance/gossiplb/gossiplb.cc | 8 ++--- .../collection/balance/gossiplb/gossiplb.h | 31 ++----------------- 3 files changed, 33 insertions(+), 32 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h index 6fd3a3e13e..9e536a46bc 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h @@ -143,6 +143,32 @@ struct LazyMigrationMsg : SerializeRequired< ObjsType objs_ = {}; }; +struct RejectionStats { + RejectionStats() = default; + RejectionStats(int n_rejected, int n_transfers) + : n_rejected_(n_rejected), n_transfers_(n_transfers) { } + + friend RejectionStats operator+(RejectionStats a1, RejectionStats const& a2) { + a1.n_rejected_ += a2.n_rejected_; + a1.n_transfers_ += a2.n_transfers_; + + return a1; + } + + int n_rejected_ = 0; + int n_transfers_ = 0; +}; + +struct GossipRejectionStatsMsg : collective::ReduceTMsg { + GossipRejectionStatsMsg() = default; + GossipRejectionStatsMsg(int n_rejected, int n_transfers) + : ReduceTMsg(RejectionStats(n_rejected, n_transfers)) + { } + GossipRejectionStatsMsg(RejectionStats&& rs) + : ReduceTMsg(std::move(rs)) + { } +}; + }}}} /* end namespace vt::vrt::collection::balance */ #endif /*INCLUDED_VT_VRT_COLLECTION_BALANCE_GOSSIPLB_GOSSIP_MSG_H*/ diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 714b47bb50..eb319c4218 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -264,7 +264,7 @@ void GossipLB::gossipStatsHandler(StatsMsgType* msg) { } } -void GossipLB::gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg) { +void GossipLB::gossipRejectionStatsHandler(GossipRejectionMsgType* msg) { auto in = msg->getConstVal(); auto n_rejected = in.n_rejected_; @@ -758,11 +758,11 @@ void GossipLB::decide() { vt::runSchedulerThrough(lazy_epoch); runInEpochCollective([=] { - using ReduceOp = collective::PlusOp; + using ReduceOp = collective::PlusOp; auto cb = vt::theCB()->makeBcast< - GossipLB, GossipRejectionStatsMsg, &GossipLB::gossipRejectionStatsHandler + GossipLB, GossipRejectionMsgType, &GossipLB::gossipRejectionStatsHandler >(this->proxy_); - auto msg = makeMessage(n_rejected, n_transfers); + auto msg = makeMessage(n_rejected, n_transfers); this->proxy_.template reduce(msg,cb); }); } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 0df17cd16a..58607c632e 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -57,32 +57,6 @@ namespace vt { namespace vrt { namespace collection { namespace lb { -struct RejectionStats { - RejectionStats() = default; - RejectionStats(int n_rejected, int n_transfers) - : n_rejected_(n_rejected), n_transfers_(n_transfers) { } - - friend RejectionStats operator+(RejectionStats a1, RejectionStats const& a2) { - a1.n_rejected_ += a2.n_rejected_; - a1.n_transfers_ += a2.n_transfers_; - - return a1; - } - - int n_rejected_ = 0; - int n_transfers_ = 0; -}; - -struct GossipRejectionStatsMsg : collective::ReduceTMsg { - GossipRejectionStatsMsg() = default; - GossipRejectionStatsMsg(int n_rejected, int n_transfers) - : ReduceTMsg(RejectionStats(n_rejected, n_transfers)) - { } - GossipRejectionStatsMsg(RejectionStats&& rs) - : ReduceTMsg(std::move(rs)) - { } -}; - enum struct InformTypeEnum : uint8_t { // synchronized rounds propagate info faster but have sync cost SyncInform = 0, @@ -92,10 +66,11 @@ enum struct InformTypeEnum : uint8_t { struct GossipLB : BaseLB { using GossipMsgAsync = balance::GossipMsgAsync; - using GossipMsgSync = balance::GossipMsg; + using GossipMsgSync = balance::GossipMsg; using NodeSetType = std::vector; using ObjsType = std::unordered_map; using ReduceMsgType = vt::collective::ReduceNoneMsg; + using GossipRejectionMsgType = balance::GossipRejectionStatsMsg; GossipLB() = default; GossipLB(GossipLB const&) = delete; @@ -132,7 +107,7 @@ struct GossipLB : BaseLB { void lazyMigrateObjsTo(EpochType epoch, NodeType node, ObjsType const& objs); void inLazyMigrations(balance::LazyMigrationMsg* msg); void gossipStatsHandler(StatsMsgType* msg); - void gossipRejectionStatsHandler(GossipRejectionStatsMsg* msg); + void gossipRejectionStatsHandler(GossipRejectionMsgType* msg); void thunkMigrations(); void setupDone(ReduceMsgType* msg); From f28fd97aa6359035d0334b6bd54d07c600bb24c8 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Tue, 2 Mar 2021 09:41:50 -0800 Subject: [PATCH 09/30] #1279: gossiplb: order objects by object id --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 2 +- src/vt/vrt/collection/balance/gossiplb/gossiplb.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index eb319c4218..ce8bdbe995 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -119,7 +119,7 @@ void GossipLB::runLB() { } void GossipLB::doLBStages(TimeType start_imb) { - std::unordered_map best_objs; + decltype(this->cur_objs_) best_objs; LoadType best_load = 0; TimeType best_imb = start_imb+1; uint16_t best_trial = 0; diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 58607c632e..1332e18257 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -129,7 +129,7 @@ struct GossipLB : BaseLB { std::unordered_set selected_ = {}; std::unordered_set underloaded_ = {}; std::unordered_set new_underloaded_ = {}; - std::unordered_map cur_objs_ = {}; + std::map cur_objs_ = {}; LoadType this_new_load_ = 0.0; TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; From c497a809787f59a119ec30565affaa83b2e2832f Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Thu, 4 Mar 2021 09:42:36 -0800 Subject: [PATCH 10/30] #1279: gossiplb: add deterministic mode --- .../collection/balance/gossiplb/gossiplb.cc | 51 +++++++++++++------ .../collection/balance/gossiplb/gossiplb.h | 4 ++ 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index ce8bdbe995..d5bdc44c2d 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -61,6 +61,9 @@ namespace vt { namespace vrt { namespace collection { namespace lb { void GossipLB::init(objgroup::proxy::Proxy in_proxy) { proxy_ = in_proxy; + auto const this_node = theContext()->getNode(); + gen_propagate_.seed(this_node + 12345); + gen_sample_.seed(this_node + 54321); } bool GossipLB::isUnderloaded(LoadType load) const { @@ -74,7 +77,7 @@ bool GossipLB::isOverloaded(LoadType load) const { } void GossipLB::inputParams(balance::SpecEntry* spec) { - std::vector allowed{"f", "k", "i", "c", "trials"}; + std::vector allowed{"f", "k", "i", "c", "trials", "deterministic"}; spec->checkAllowedKeys(allowed); using CriterionEnumUnder = typename std::underlying_type::type; using InformTypeEnumUnder = typename std::underlying_type::type; @@ -82,14 +85,20 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { auto default_c = static_cast(criterion_); auto default_inform = static_cast(inform_type_); - f_ = spec->getOrDefault("f", f_); - k_max_ = spec->getOrDefault("k", k_max_); - num_iters_ = spec->getOrDefault("i", num_iters_); - num_trials_ = spec->getOrDefault("trials", num_trials_); - int32_t c = spec->getOrDefault("c", default_c); - criterion_ = static_cast(c); - int32_t inf = spec->getOrDefault("inform", default_inform); - inform_type_ = static_cast(inf); + f_ = spec->getOrDefault("f", f_); + k_max_ = spec->getOrDefault("k", k_max_); + num_iters_ = spec->getOrDefault("i", num_iters_); + num_trials_ = spec->getOrDefault("trials", num_trials_); + deterministic_ = spec->getOrDefault("deterministic", deterministic_); + int32_t c = spec->getOrDefault("c", default_c); + criterion_ = static_cast(c); + int32_t inf = spec->getOrDefault("inform", default_inform); + inform_type_ = static_cast(inf); + + vtAbortIf( + inform_type_ == InformTypeEnum::AsyncInform && deterministic_, + "Asynchronous informs allow race conditions and thus are not deterministic" + ); } void GossipLB::runLB() { @@ -412,7 +421,10 @@ void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { auto const this_node = theContext()->getNode(); auto const num_nodes = theContext()->getNumNodes(); std::uniform_int_distribution dist(0, num_nodes - 1); - std::mt19937 gen(seed_()); + + if (!deterministic_) { + gen_propagate_.seed(seed_()); + } auto& selected = selected_; selected = underloaded_; @@ -440,7 +452,7 @@ void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { // Keep generating until we have a unique node for this round do { - random_node = dist(gen); + random_node = dist(gen_propagate_); } while ( selected.find(random_node) != selected.end() ); @@ -475,7 +487,10 @@ void GossipLB::propagateRoundSync(EpochType epoch) { auto const this_node = theContext()->getNode(); auto const num_nodes = theContext()->getNumNodes(); std::uniform_int_distribution dist(0, num_nodes - 1); - std::mt19937 gen(seed_()); + + if (!deterministic_) { + gen_propagate_.seed(seed_()); + } auto& selected = selected_; selected = underloaded_; @@ -503,7 +518,7 @@ void GossipLB::propagateRoundSync(EpochType epoch) { // Keep generating until we have a unique node for this round do { - random_node = dist(gen); + random_node = dist(gen_propagate_); } while ( selected.find(random_node) != selected.end() ); @@ -613,12 +628,15 @@ NodeType GossipLB::sampleFromCMF( ) { // Create the distribution std::uniform_real_distribution dist(0.0, 1.0); - std::mt19937 gen(seed_()); + + if (!deterministic_) { + gen_sample_.seed(seed_()); + } NodeType selected_node = uninitialized_destination; // Pick from the CMF - auto const u = dist(gen); + auto const u = dist(gen_sample_); std::size_t i = 0; for (auto&& x : cmf) { if (x >= u) { @@ -638,6 +656,9 @@ std::vector GossipLB::makeUnderloaded() const { under.push_back(elm.first); } } + if (deterministic_) { + std::sort(under.begin(), under.end()); + } return under; } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 1332e18257..fbb7d786e2 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -52,6 +52,7 @@ #include #include +#include #include #include @@ -120,6 +121,7 @@ struct GossipLB : BaseLB { uint16_t trial_ = 0; uint16_t num_iters_ = 4; uint16_t num_trials_ = 3; + bool deterministic_ = false; std::random_device seed_; std::unordered_map load_info_ = {}; std::unordered_map new_load_info_ = {}; @@ -137,6 +139,8 @@ struct GossipLB : BaseLB { bool setup_done_ = false; bool propagate_next_round_ = false; std::vector propagated_k_; + std::mt19937 gen_propagate_; + std::mt19937 gen_sample_; }; }}}} /* end namespace vt::vrt::collection::lb */ From 6f47e4998f38f23a509bde3e4583952b8db063a3 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 5 Mar 2021 14:49:40 -0800 Subject: [PATCH 11/30] #1279: gossiplb: fix loads that ended up in microseconds --- .../collection/balance/gossiplb/gossiplb.cc | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index d5bdc44c2d..9a2892d146 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -694,6 +694,12 @@ void GossipLB::decide() { if (under.size() > 0) { // Iterate through all the objects for (auto iter = cur_objs_.begin(); iter != cur_objs_.end(); ) { + auto obj_load = cur_objs_[iter->first]; + + // the object stats are in seconds but the processor stats are in + // milliseconds; for now, convert the object loads to milliseconds + auto obj_load_ms = loadMilli(obj_load); + // Rebuild the relaxed underloaded set based on updated load of this node under = makeUnderloaded(); if (under.size() == 0) { @@ -719,18 +725,13 @@ void GossipLB::decide() { //auto max_obj_size = avg - selected_load; auto obj_id = iter->first; - // @todo: for now, convert to milliseconds due to the stats framework all - // computing in milliseconds; should be converted to seconds along with - // the rest of the stats framework - auto obj_load = loadMilli(iter->second); - - bool eval = Criterion(criterion_)(this_new_load_, selected_load, obj_load, avg); + bool eval = Criterion(criterion_)(this_new_load_, selected_load, obj_load_ms, avg); vt_debug_print( verbose, gossiplb, "GossipLB::decide: trial={}, iter={}, under.size()={}, " "selected_node={}, selected_load={:e}, obj_id={:x}, home={}, " - "obj_load={:e}, avg={:e}, this_new_load_={:e}, criterion={}\n", + "obj_load_ms={:e}, avg={:e}, this_new_load_={:e}, criterion={}\n", trial_, iter_, under.size(), @@ -738,7 +739,7 @@ void GossipLB::decide() { selected_load, obj_id.id, obj_id.home_node, - obj_load, + obj_load_ms, avg, this_new_load_, eval @@ -746,10 +747,12 @@ void GossipLB::decide() { if (eval) { ++n_transfers; + // transfer the object load in seconds, not milliseconds, + // to match the object load units on the receiving end migrate_objs[selected_node][obj_id] = obj_load; - this_new_load_ -= obj_load; - selected_load += obj_load; + this_new_load_ -= obj_load_ms; + selected_load += obj_load_ms; iter = cur_objs_.erase(iter); } else { @@ -808,7 +811,8 @@ void GossipLB::inLazyMigrations(balance::LazyMigrationMsg* msg) { auto iter = cur_objs_.find(obj.first); vtAssert(iter == cur_objs_.end(), "Incoming object should not exist"); cur_objs_.insert(obj); - this_new_load_ += obj.second; + // need to convert to milliseconds because we received seconds + this_new_load_ += loadMilli(obj.second); } } From e76aed393bdf5d0646a9b04c3614cd501284cecc Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 5 Mar 2021 14:53:25 -0800 Subject: [PATCH 12/30] #1279: gossiplb: add object ordering options --- .../collection/balance/gossiplb/gossiplb.cc | 92 +++++++++++++++++-- .../collection/balance/gossiplb/gossiplb.h | 9 +- 2 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 9a2892d146..2745a77e59 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -77,13 +77,18 @@ bool GossipLB::isOverloaded(LoadType load) const { } void GossipLB::inputParams(balance::SpecEntry* spec) { - std::vector allowed{"f", "k", "i", "c", "trials", "deterministic"}; + std::vector allowed{ + "f", "k", "i", "c", "trials", "deterministic", "ordering" + }; spec->checkAllowedKeys(allowed); - using CriterionEnumUnder = typename std::underlying_type::type; - using InformTypeEnumUnder = typename std::underlying_type::type; - auto default_c = static_cast(criterion_); + using CriterionEnumUnder = typename std::underlying_type::type; + using InformTypeEnumUnder = typename std::underlying_type::type; + using ObjectOrderEnumUnder = typename std::underlying_type::type; + + auto default_c = static_cast(criterion_); auto default_inform = static_cast(inform_type_); + auto default_order = static_cast(obj_ordering_); f_ = spec->getOrDefault("f", f_); k_max_ = spec->getOrDefault("k", k_max_); @@ -91,14 +96,21 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { num_trials_ = spec->getOrDefault("trials", num_trials_); deterministic_ = spec->getOrDefault("deterministic", deterministic_); int32_t c = spec->getOrDefault("c", default_c); + criterion_ = static_cast(c); int32_t inf = spec->getOrDefault("inform", default_inform); inform_type_ = static_cast(inf); + int32_t ord = spec->getOrDefault("ordering", default_order); + obj_ordering_ = static_cast(ord); vtAbortIf( inform_type_ == InformTypeEnum::AsyncInform && deterministic_, "Asynchronous informs allow race conditions and thus are not deterministic" ); + vtAbortIf( + obj_ordering_ == ObjectOrderEnum::Arbitrary && deterministic_, + "Arbitrary object ordering is not deterministic" + ); } void GossipLB::runLB() { @@ -692,9 +704,71 @@ void GossipLB::decide() { std::unordered_map migrate_objs; if (under.size() > 0) { + std::vector ordered_obj_ids(cur_objs_.size()); + + // define the iteration order + int i = 0; + for (auto &obj : cur_objs_) { + ordered_obj_ids[i++] = obj.first; + } + switch (obj_ordering_) { + case ObjectOrderEnum::ElmID: + std::sort( + ordered_obj_ids.begin(), ordered_obj_ids.end(), std::less() + ); + break; + case ObjectOrderEnum::Marginal: + { + // first find marginal object's load + auto over_avg = this_new_load_ - avg; + // if no objects are larger than over_avg, then marginal will still + // (incorrectly) reflect the total load, which will not be a problem + auto marginal = this_new_load_; + for (auto &obj : cur_objs_) { + // the object stats are in seconds but the processor stats are in + // milliseconds; for now, convert the object loads to milliseconds + auto obj_load_ms = loadMilli(obj.second); + if (obj_load_ms > over_avg && obj_load_ms < marginal) { + marginal = obj_load_ms; + } + } + // sort largest to smallest if <= marginal + // sort smallest to largest if > marginal + std::sort( + ordered_obj_ids.begin(), ordered_obj_ids.end(), + [=](const ObjIDType &left, const ObjIDType &right) { + auto left_load = loadMilli(this->cur_objs_[left]); + auto right_load = loadMilli(this->cur_objs_[right]); + if (left_load <= marginal && right_load <= marginal) { + // we're in the sort load descending regime (first section) + return left_load > right_load; + } + // else + // EITHER + // a) both are above the cut, and we're in the sort ascending + // regime (second section), so return left < right + // OR + // b) one is above the cut and one is at or below, and the one + // that is at or below the cut needs to come first, so + // also return left < right + return left_load < right_load; + } + ); + vt_debug_print( + normal, gossiplb, + "GossipLB::decide: over_avg={}, marginal={}\n", + over_avg, loadMilli(cur_objs_[ordered_obj_ids[0]]) + ); + } + break; + default: + break; + } + // Iterate through all the objects - for (auto iter = cur_objs_.begin(); iter != cur_objs_.end(); ) { - auto obj_load = cur_objs_[iter->first]; + for (auto iter = ordered_obj_ids.begin(); iter != ordered_obj_ids.end(); ) { + auto obj_id = *iter; + auto obj_load = cur_objs_[obj_id]; // the object stats are in seconds but the processor stats are in // milliseconds; for now, convert the object loads to milliseconds @@ -712,7 +786,7 @@ void GossipLB::decide() { vt_debug_print( verbose, gossiplb, - "GossipLB::decide: selected_node={}, load_info_.size()\n", + "GossipLB::decide: selected_node={}, load_info_.size()={}\n", selected_node, load_info_.size() ); @@ -723,7 +797,6 @@ void GossipLB::decide() { auto& selected_load = load_iter->second; //auto max_obj_size = avg - selected_load; - auto obj_id = iter->first; bool eval = Criterion(criterion_)(this_new_load_, selected_load, obj_load_ms, avg); @@ -754,7 +827,8 @@ void GossipLB::decide() { this_new_load_ -= obj_load_ms; selected_load += obj_load_ms; - iter = cur_objs_.erase(iter); + iter = ordered_obj_ids.erase(iter); + cur_objs_.erase(obj_id); } else { ++n_rejected; iter++; diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index fbb7d786e2..69f7585a21 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -65,6 +65,12 @@ enum struct InformTypeEnum : uint8_t { AsyncInform = 1 }; +enum struct ObjectOrderEnum : uint8_t { + Arbitrary = 0, + ElmID = 1, + Marginal = 2 +}; + struct GossipLB : BaseLB { using GossipMsgAsync = balance::GossipMsgAsync; using GossipMsgSync = balance::GossipMsg; @@ -131,11 +137,12 @@ struct GossipLB : BaseLB { std::unordered_set selected_ = {}; std::unordered_set underloaded_ = {}; std::unordered_set new_underloaded_ = {}; - std::map cur_objs_ = {}; + std::unordered_map cur_objs_ = {}; LoadType this_new_load_ = 0.0; TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; InformTypeEnum inform_type_ = InformTypeEnum::SyncInform; + ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::Arbitrary; bool setup_done_ = false; bool propagate_next_round_ = false; std::vector propagated_k_; From 26775fe01aebeb0a76ec9f11c9f91d2c1ae5c3f2 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 5 Mar 2021 15:07:32 -0800 Subject: [PATCH 13/30] #1279: gossiplb: cleanup --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 2745a77e59..ad604b0e34 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -128,9 +128,8 @@ void GossipLB::runLB() { if (theContext()->getNode() == 0) { vt_print( gossiplb, - "GossipLB::runLB: avg={}, max={}, load={}," - " overloaded_={}, underloaded_={}, should_lb={}\n", - avg, max, load, is_overloaded_, is_underloaded_, should_lb + "GossipLB::runLB: avg={}, max={}, load={}, should_lb={}\n", + avg, max, load, should_lb ); } From ec4de3dd83070db2413a88cfcd73589e2c8549fe Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 5 Mar 2021 16:52:42 -0800 Subject: [PATCH 14/30] #1279: gossiplb: increase precision of imbalance --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index ad604b0e34..82135c9f10 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -219,7 +219,7 @@ void GossipLB::doLBStages(TimeType start_imb) { if (this_node == 0) { vt_print( gossiplb, - "GossipLB::doLBStages: trial={} iter={} imb={:0.2f}\n", + "GossipLB::doLBStages: trial={} iter={} imb={:0.4f}\n", trial_, iter_, new_imbalance_ ); } @@ -228,7 +228,7 @@ void GossipLB::doLBStages(TimeType start_imb) { if (this_node == 0) { vt_print( gossiplb, - "GossipLB::doLBStages: trial={} imb={:0.2f}\n", + "GossipLB::doLBStages: trial={} imb={:0.4f}\n", trial_, new_imbalance_ ); } @@ -254,7 +254,7 @@ void GossipLB::doLBStages(TimeType start_imb) { if (this_node == 0) { vt_print( gossiplb, - "GossipLB::doLBStages: chose trial={} with imb={:0.2f}\n", + "GossipLB::doLBStages: chose trial={} with imb={:0.4f}\n", best_trial, new_imbalance_ ); } @@ -278,7 +278,7 @@ void GossipLB::gossipStatsHandler(StatsMsgType* msg) { if (this_node == 0) { vt_print( gossiplb, - "GossipLB::gossipStatsHandler: max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.2f}\n", + "GossipLB::gossipStatsHandler: max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.4f}\n", in.max(), in.min(), in.avg(), in.I() ); } From f3f4f72e6085c096d78b3b7ccd7c0bb93a77893b Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 5 Mar 2021 17:08:54 -0800 Subject: [PATCH 15/30] #1279: gossiplb: add options for cmf --- .../collection/balance/gossiplb/gossiplb.cc | 43 ++++++++++++++++--- .../collection/balance/gossiplb/gossiplb.h | 7 +++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 82135c9f10..d8a1ae2d62 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -78,17 +78,19 @@ bool GossipLB::isOverloaded(LoadType load) const { void GossipLB::inputParams(balance::SpecEntry* spec) { std::vector allowed{ - "f", "k", "i", "c", "trials", "deterministic", "ordering" + "f", "k", "i", "c", "trials", "deterministic", "ordering", "cmf" }; spec->checkAllowedKeys(allowed); using CriterionEnumUnder = typename std::underlying_type::type; using InformTypeEnumUnder = typename std::underlying_type::type; using ObjectOrderEnumUnder = typename std::underlying_type::type; + using CMFTypeEnumUnder = typename std::underlying_type::type; auto default_c = static_cast(criterion_); auto default_inform = static_cast(inform_type_); auto default_order = static_cast(obj_ordering_); + auto default_cmf = static_cast(cmf_type_); f_ = spec->getOrDefault("f", f_); k_max_ = spec->getOrDefault("k", k_max_); @@ -102,6 +104,8 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { inform_type_ = static_cast(inf); int32_t ord = spec->getOrDefault("ordering", default_order); obj_ordering_ = static_cast(ord); + int32_t cmf = spec->getOrDefault("cmf", default_cmf); + cmf_type_ = static_cast(cmf); vtAbortIf( inform_type_ == InformTypeEnum::AsyncInform && deterministic_, @@ -612,15 +616,40 @@ std::vector GossipLB::createCMF(NodeSetType const& under) { // Build the CMF double sum_p = 0.0; - double inv_l_avg = 1.0 / avg; - std::vector cmf = {}; + double factor = 1.0; + + switch (cmf_type_) { + case CMFTypeEnum::Original: + factor = 1.0 / avg; + break; + case CMFTypeEnum::NormBySelf: + factor = 1.0 / this_new_load_; + break; + case CMFTypeEnum::NormByMax: + { + double l_max = 0.0; + for (auto&& pe : under) { + auto iter = load_info_.find(pe); + vtAssert(iter != load_info_.end(), "Node must be in load_info_"); + auto load = iter->second; + if (load > l_max) { + l_max = load; + } + } + factor = 1.0 / (l_max > avg ? l_max : avg); + } + break; + default: + vtAbort("This CMF type is not supported"); + } + std::vector cmf = {}; for (auto&& pe : under) { auto iter = load_info_.find(pe); vtAssert(iter != load_info_.end(), "Node must be in load_info_"); auto load = iter->second; - sum_p += 1. - inv_l_avg * load; + sum_p += 1. - factor * load; cmf.push_back(sum_p); } @@ -773,8 +802,10 @@ void GossipLB::decide() { // milliseconds; for now, convert the object loads to milliseconds auto obj_load_ms = loadMilli(obj_load); - // Rebuild the relaxed underloaded set based on updated load of this node - under = makeUnderloaded(); + if (cmf_type_ == CMFTypeEnum::Original) { + // Rebuild the relaxed underloaded set based on updated load of this node + under = makeUnderloaded(); + } if (under.size() == 0) { break; } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 69f7585a21..78a669987b 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -71,6 +71,12 @@ enum struct ObjectOrderEnum : uint8_t { Marginal = 2 }; +enum struct CMFTypeEnum : uint8_t { + Original = 0, + NormByMax = 1, + NormBySelf = 2 +}; + struct GossipLB : BaseLB { using GossipMsgAsync = balance::GossipMsgAsync; using GossipMsgSync = balance::GossipMsg; @@ -143,6 +149,7 @@ struct GossipLB : BaseLB { CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; InformTypeEnum inform_type_ = InformTypeEnum::SyncInform; ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::Arbitrary; + CMFTypeEnum cmf_type_ = CMFTypeEnum::Original; bool setup_done_ = false; bool propagate_next_round_ = false; std::vector propagated_k_; From 45ce2d5bff64ed216e6bded16b9dc1a0617ee092 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 8 Mar 2021 11:57:27 -0800 Subject: [PATCH 16/30] #1279: gossiplb: tune print verbosity --- .../collection/balance/gossiplb/gossiplb.cc | 49 +++++++++---------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index d8a1ae2d62..610525a175 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -130,8 +130,8 @@ void GossipLB::runLB() { } if (theContext()->getNode() == 0) { - vt_print( - gossiplb, + vt_debug_print( + terse, gossiplb, "GossipLB::runLB: avg={}, max={}, load={}, should_lb={}\n", avg, max, load, should_lb ); @@ -203,7 +203,7 @@ void GossipLB::doLBStages(TimeType start_imb) { decide(); vt_debug_print( - normal, gossiplb, + verbose, gossiplb, "GossipLB::doLBStages: (after) running trial={}, iter={}, " "num_iters={}, load={}, new_load={}\n", trial_, iter_, num_iters_, this_load, this_new_load_ @@ -219,20 +219,12 @@ void GossipLB::doLBStages(TimeType start_imb) { auto msg = makeMessage(Statistic::P_l, this_new_load_); this->proxy_.template reduce(msg,cb); }); - - if (this_node == 0) { - vt_print( - gossiplb, - "GossipLB::doLBStages: trial={} iter={} imb={:0.4f}\n", - trial_, iter_, new_imbalance_ - ); - } } if (this_node == 0) { vt_print( gossiplb, - "GossipLB::doLBStages: trial={} imb={:0.4f}\n", + "GossipLB::doLBStages: trial={} final imb={:0.4f}\n", trial_, new_imbalance_ ); } @@ -280,10 +272,11 @@ void GossipLB::gossipStatsHandler(StatsMsgType* msg) { auto this_node = theContext()->getNode(); if (this_node == 0) { - vt_print( - gossiplb, - "GossipLB::gossipStatsHandler: max={:0.2f} min={:0.2f} avg={:0.2f} imb={:0.4f}\n", - in.max(), in.min(), in.avg(), in.I() + vt_debug_print( + terse, gossiplb, + "GossipLB::gossipStatsHandler: trial={} iter={} max={:0.2f} min={:0.2f} " + "avg={:0.2f} imb={:0.4f}\n", + trial_, iter_, in.max(), in.min(), in.avg(), in.I() ); } } @@ -293,13 +286,15 @@ void GossipLB::gossipRejectionStatsHandler(GossipRejectionMsgType* msg) { auto n_rejected = in.n_rejected_; auto n_transfers = in.n_transfers_; - double rej = static_cast(n_rejected) / static_cast(n_rejected + n_transfers) * 100.0; + double rej = static_cast(n_rejected) / + static_cast(n_rejected + n_transfers) * 100.0; auto this_node = theContext()->getNode(); if (this_node == 0) { - vt_print( - gossiplb, - "GossipLB::gossipRejectionStatsHandler: n_transfers={} n_rejected={} rejection_rate={:0.1f}%\n", + vt_debug_print( + terse, gossiplb, + "GossipLB::gossipRejectionStatsHandler: n_transfers={} n_rejected={} " + "rejection_rate={:0.1f}%\n", n_transfers, n_rejected, rej ); } @@ -310,7 +305,7 @@ void GossipLB::informAsync() { uint8_t k_cur_async = 0; vt_debug_print( - normal, gossiplb, + normal, gossiplb, node, "GossipLB::informAsync: starting inform phase: trial={}, iter={}, " "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ @@ -343,8 +338,8 @@ void GossipLB::informAsync() { vt::runSchedulerThrough(propagate_epoch); if (is_overloaded_) { - vt_print( - gossiplb, + vt_debug_print( + verbose, gossiplb, "GossipLB::informAsync: trial={}, iter={}, known underloaded={}\n", trial_, iter_, underloaded_.size() ); @@ -407,8 +402,8 @@ void GossipLB::informSync() { } if (is_overloaded_) { - vt_print( - gossiplb, + vt_debug_print( + terse, gossiplb, "GossipLB::informSync: trial={}, iter={}, known underloaded={}\n", trial_, iter_, underloaded_.size() ); @@ -783,7 +778,7 @@ void GossipLB::decide() { } ); vt_debug_print( - normal, gossiplb, + verbose, gossiplb, "GossipLB::decide: over_avg={}, marginal={}\n", over_avg, loadMilli(cur_objs_[ordered_obj_ids[0]]) ); @@ -897,7 +892,7 @@ void GossipLB::decide() { void GossipLB::thunkMigrations() { vt_debug_print( - normal, gossiplb, + terse, gossiplb, "thunkMigrations, total num_objs={}\n", cur_objs_.size() ); From 0b7dbc5cbaf258b513cac89722e531ddfb2716b9 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 8 Mar 2021 13:08:41 -0800 Subject: [PATCH 17/30] #1279: gossiplb: clean up redundant code --- .../collection/balance/gossiplb/gossiplb.cc | 110 +++++------------- .../collection/balance/gossiplb/gossiplb.h | 3 +- 2 files changed, 30 insertions(+), 83 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 610525a175..ae1249ece1 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -305,7 +305,7 @@ void GossipLB::informAsync() { uint8_t k_cur_async = 0; vt_debug_print( - normal, gossiplb, node, + normal, gossiplb, "GossipLB::informAsync: starting inform phase: trial={}, iter={}, " "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ @@ -330,7 +330,7 @@ void GossipLB::informAsync() { // Underloaded start the round if (is_underloaded_) { - propagateRoundAsync(k_cur_async, propagate_epoch); + propagateRound(k_cur_async, false, propagate_epoch); } theTerm()->finishedEpoch(propagate_epoch); @@ -388,7 +388,7 @@ void GossipLB::informSync() { // Underloaded start the first round; ranks that received on some round // start subsequent rounds if (propagate_this_round) { - propagateRoundSync(propagate_epoch); + propagateRound(k_cur_, propagate_epoch, true); } theTerm()->finishedEpoch(propagate_epoch); @@ -421,11 +421,11 @@ void GossipLB::setupDone(ReduceMsgType* msg) { setup_done_ = true; } -void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { +void GossipLB::propagateRound(uint8_t k_cur, bool sync, EpochType epoch) { vt_debug_print( normal, gossiplb, - "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, k_cur={}\n", - trial_, iter_, k_max_, k_cur_ + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}\n", + trial_, iter_, k_max_, k_cur ); auto const this_node = theContext()->getNode(); @@ -446,9 +446,9 @@ void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, k_cur={}, " + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, k_cur={}, " "selected.size()={}, fanout={}\n", - trial_, iter_, k_max_, k_cur_, selected.size(), fanout + trial_, iter_, k_max_, k_cur, selected.size(), fanout ); for (int i = 0; i < fanout; i++) { @@ -470,83 +470,31 @@ void GossipLB::propagateRoundAsync(uint8_t k_cur_async, EpochType epoch) { vt_debug_print( verbose, gossiplb, - "GossipLB::propagateRoundAsync: trial={}, iter={}, k_max={}, " + "GossipLB::propagateRound: trial={}, iter={}, k_max={}, " "k_cur={}, sending={}\n", - trial_, iter_, k_max_, k_cur_, random_node + trial_, iter_, k_max_, k_cur, random_node ); // Send message with load - auto msg = makeMessage(this_node, load_info_, k_cur_async); - if (epoch != no_epoch) { - envelopeSetEpoch(msg->env, epoch); - } - msg->addNodeLoad(this_node, this_new_load_); - proxy_[random_node].sendMsg< - GossipMsgAsync, &GossipLB::propagateIncomingAsync - >(msg.get()); - } -} - -void GossipLB::propagateRoundSync(EpochType epoch) { - vt_debug_print( - normal, gossiplb, - "GossipLB::propagateRoundSync: trial={}, iter={}, k_max={}, k_cur={}\n", - trial_, iter_, k_max_, k_cur_ - ); - - auto const this_node = theContext()->getNode(); - auto const num_nodes = theContext()->getNumNodes(); - std::uniform_int_distribution dist(0, num_nodes - 1); - - if (!deterministic_) { - gen_propagate_.seed(seed_()); - } - - auto& selected = selected_; - selected = underloaded_; - if (selected.find(this_node) == selected.end()) { - selected.insert(this_node); - } - - auto const fanout = std::min(f_, static_cast(num_nodes - 1)); - - vt_debug_print( - verbose, gossiplb, - "GossipLB::propagateRoundSync: trial={}, iter={}, k_max={}, k_cur={}, " - "selected.size()={}, fanout={}\n", - trial_, iter_, k_max_, k_cur_, selected.size(), fanout - ); - - for (int i = 0; i < fanout; i++) { - // This implies full knowledge of all processors - if (selected.size() >= static_cast(num_nodes)) { - return; - } - - // First, randomly select a node - NodeType random_node = uninitialized_destination; - - // Keep generating until we have a unique node for this round - do { - random_node = dist(gen_propagate_); - } while ( - selected.find(random_node) != selected.end() - ); - selected.insert(random_node); - - vt_debug_print( - verbose, gossiplb, - "GossipLB::propagateRoundSync: k_max_={}, k_cur_={}, sending={}\n", - k_max_, k_cur_, random_node - ); - - // Send message with load - auto msg = makeMessage(this_node, load_info_); - if (epoch != no_epoch) { - envelopeSetEpoch(msg->env, epoch); + if (sync) { + auto msg = makeMessage(this_node, load_info_); + if (epoch != no_epoch) { + envelopeSetEpoch(msg->env, epoch); + } + msg->addNodeLoad(this_node, this_new_load_); + proxy_[random_node].sendMsg< + GossipMsgSync, &GossipLB::propagateIncomingSync + >(msg.get()); + } else { + auto msg = makeMessage(this_node, load_info_, k_cur); + if (epoch != no_epoch) { + envelopeSetEpoch(msg->env, epoch); + } + msg->addNodeLoad(this_node, this_new_load_); + proxy_[random_node].sendMsg< + GossipMsgAsync, &GossipLB::propagateIncomingAsync + >(msg.get()); } - msg->addNodeLoad(this_node, this_new_load_); - proxy_[random_node].sendMsg(msg.get()); } } @@ -578,7 +526,7 @@ void GossipLB::propagateIncomingAsync(GossipMsgAsync* msg) { } else { // send out another round propagated_k_[k_cur_async] = true; - propagateRoundAsync(k_cur_async + 1); + propagateRound(k_cur_async + 1, false); } } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 78a669987b..44eafebbbd 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -102,8 +102,7 @@ struct GossipLB : BaseLB { void decide(); void migrate(); - void propagateRoundAsync(uint8_t k_cur_async, EpochType epoch = no_epoch); - void propagateRoundSync(EpochType epoch = no_epoch); + void propagateRound(uint8_t k_cur_async, bool sync, EpochType epoch = no_epoch); void propagateIncomingAsync(GossipMsgAsync* msg); void propagateIncomingSync(GossipMsgSync* msg); bool isUnderloaded(LoadType load) const; From 3a168e5d592ca68f6480251d1f24e1dc74266192 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 8 Mar 2021 14:35:04 -0800 Subject: [PATCH 18/30] #1279: gossiplb: minimize reductions when not debugging --- .../collection/balance/gossiplb/gossiplb.cc | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index ae1249ece1..90cd1f7812 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -209,16 +209,18 @@ void GossipLB::doLBStages(TimeType start_imb) { trial_, iter_, num_iters_, this_load, this_new_load_ ); - runInEpochCollective([=] { - using StatsMsgType = balance::NodeStatsMsg; - using ReduceOp = collective::PlusOp; - auto cb = vt::theCB()->makeBcast< - GossipLB, StatsMsgType, &GossipLB::gossipStatsHandler - >(this->proxy_); - // Perform the reduction for P_l -> processor load only - auto msg = makeMessage(Statistic::P_l, this_new_load_); - this->proxy_.template reduce(msg,cb); - }); + if (theConfig()->vt_debug_gossiplb || (iter_ == num_iters_ - 1)) { + runInEpochCollective([=] { + using StatsMsgType = balance::NodeStatsMsg; + using ReduceOp = collective::PlusOp; + auto cb = vt::theCB()->makeBcast< + GossipLB, StatsMsgType, &GossipLB::gossipStatsHandler + >(this->proxy_); + // Perform the reduction for P_l -> processor load only + auto msg = makeMessage(Statistic::P_l, this_new_load_); + this->proxy_.template reduce(msg,cb); + }); + } } if (this_node == 0) { @@ -828,14 +830,17 @@ void GossipLB::decide() { vt::runSchedulerThrough(lazy_epoch); - runInEpochCollective([=] { - using ReduceOp = collective::PlusOp; - auto cb = vt::theCB()->makeBcast< - GossipLB, GossipRejectionMsgType, &GossipLB::gossipRejectionStatsHandler - >(this->proxy_); - auto msg = makeMessage(n_rejected, n_transfers); - this->proxy_.template reduce(msg,cb); - }); + if (theConfig()->vt_debug_gossiplb) { + // compute rejection rate because it will be printed + runInEpochCollective([=] { + using ReduceOp = collective::PlusOp; + auto cb = vt::theCB()->makeBcast< + GossipLB, GossipRejectionMsgType, &GossipLB::gossipRejectionStatsHandler + >(this->proxy_); + auto msg = makeMessage(n_rejected, n_transfers); + this->proxy_.template reduce(msg,cb); + }); + } } void GossipLB::thunkMigrations() { From fadcc8bdd0eba22fb4ad96dd78521947caf45e47 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 15 Mar 2021 12:58:42 -0700 Subject: [PATCH 19/30] #1279: gossiplb: fix bug in cmf computation --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 90cd1f7812..7703057bd8 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -557,9 +557,18 @@ void GossipLB::propagateIncomingSync(GossipMsgSync* msg) { } std::vector GossipLB::createCMF(NodeSetType const& under) { + // Build the CMF + std::vector cmf = {}; + + if (under.size() == 1) { + // trying to compute the cmf for only a single object can result + // in nan for some cmf types below, so do it the easy way instead + cmf.push_back(1.0); + return cmf; + } + double const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); - // Build the CMF double sum_p = 0.0; double factor = 1.0; @@ -588,7 +597,6 @@ std::vector GossipLB::createCMF(NodeSetType const& under) { vtAbort("This CMF type is not supported"); } - std::vector cmf = {}; for (auto&& pe : under) { auto iter = load_info_.find(pe); vtAssert(iter != load_info_.end(), "Node must be in load_info_"); From f19ff09ef174f953421bd0cf0c4fc4341eadbcd2 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Mon, 15 Mar 2021 13:02:17 -0700 Subject: [PATCH 20/30] #1279: gossiplb: change default options --- src/vt/vrt/collection/balance/gossiplb/gossiplb.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 44eafebbbd..b33d73876c 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -147,8 +147,8 @@ struct GossipLB : BaseLB { TimeType new_imbalance_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; InformTypeEnum inform_type_ = InformTypeEnum::SyncInform; - ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::Arbitrary; - CMFTypeEnum cmf_type_ = CMFTypeEnum::Original; + ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::Marginal; + CMFTypeEnum cmf_type_ = CMFTypeEnum::NormByMax; bool setup_done_ = false; bool propagate_next_round_ = false; std::vector propagated_k_; From 3e9536b953f4dd27c90cbf3da8d325b92aebf72e Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Tue, 16 Mar 2021 08:30:50 -0700 Subject: [PATCH 21/30] #1279: gossiplb: fix missing allowed key --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 7703057bd8..25331111ae 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -78,7 +78,7 @@ bool GossipLB::isOverloaded(LoadType load) const { void GossipLB::inputParams(balance::SpecEntry* spec) { std::vector allowed{ - "f", "k", "i", "c", "trials", "deterministic", "ordering", "cmf" + "f", "k", "i", "c", "trials", "deterministic", "inform", "ordering", "cmf" }; spec->checkAllowedKeys(allowed); @@ -97,8 +97,8 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { num_iters_ = spec->getOrDefault("i", num_iters_); num_trials_ = spec->getOrDefault("trials", num_trials_); deterministic_ = spec->getOrDefault("deterministic", deterministic_); - int32_t c = spec->getOrDefault("c", default_c); + int32_t c = spec->getOrDefault("c", default_c); criterion_ = static_cast(c); int32_t inf = spec->getOrDefault("inform", default_inform); inform_type_ = static_cast(inf); From cfccd662c55a752569c4a69a9e956bc2e334a404 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Wed, 17 Mar 2021 09:11:02 -0700 Subject: [PATCH 22/30] #1279: gossiplb: prevent informs from being received early --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 25331111ae..065d9f75e1 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -384,6 +384,8 @@ void GossipLB::informSync() { theSched()->runSchedulerWhile([this]{ return not setup_done_; }); for (; k_cur_ < k_max_; ++k_cur_) { + vt::theCollective()->barrier(); + auto name = fmt::format("GossipLB: informSync k_cur={}", k_cur_); auto propagate_epoch = theTerm()->makeEpochCollective(name); From da68ac1f1c09dc3c20ad26eb7fb39b4a8ff0cb9d Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Wed, 17 Mar 2021 09:56:30 -0700 Subject: [PATCH 23/30] #1279: gossiplb: roll back to best iter --- .../collection/balance/gossiplb/gossiplb.cc | 34 ++++++++++++------- .../collection/balance/gossiplb/gossiplb.h | 1 + 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 065d9f75e1..01b80e34dd 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -78,7 +78,8 @@ bool GossipLB::isOverloaded(LoadType load) const { void GossipLB::inputParams(balance::SpecEntry* spec) { std::vector allowed{ - "f", "k", "i", "c", "trials", "deterministic", "inform", "ordering", "cmf" + "f", "k", "i", "c", "trials", "deterministic", "inform", "ordering", "cmf", + "rollback" }; spec->checkAllowedKeys(allowed); @@ -97,6 +98,7 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { num_iters_ = spec->getOrDefault("i", num_iters_); num_trials_ = spec->getOrDefault("trials", num_trials_); deterministic_ = spec->getOrDefault("deterministic", deterministic_); + rollback_ = spec->getOrDefault("rollback", rollback_); int32_t c = spec->getOrDefault("c", default_c); criterion_ = static_cast(c); @@ -145,7 +147,7 @@ void GossipLB::runLB() { void GossipLB::doLBStages(TimeType start_imb) { decltype(this->cur_objs_) best_objs; LoadType best_load = 0; - TimeType best_imb = start_imb+1; + TimeType best_imb = start_imb + 10; uint16_t best_trial = 0; auto this_node = theContext()->getNode(); @@ -158,6 +160,8 @@ void GossipLB::doLBStages(TimeType start_imb) { k_cur_ = 0; is_overloaded_ = is_underloaded_ = false; + TimeType best_imb_this_trial = start_imb + 10; + for (iter_ = 0; iter_ < num_iters_; iter_++) { bool first_iter = iter_ == 0; @@ -209,7 +213,7 @@ void GossipLB::doLBStages(TimeType start_imb) { trial_, iter_, num_iters_, this_load, this_new_load_ ); - if (theConfig()->vt_debug_gossiplb || (iter_ == num_iters_ - 1)) { + if (rollback_ || theConfig()->vt_debug_gossiplb || (iter_ == num_iters_ - 1)) { runInEpochCollective([=] { using StatsMsgType = balance::NodeStatsMsg; using ReduceOp = collective::PlusOp; @@ -221,23 +225,29 @@ void GossipLB::doLBStages(TimeType start_imb) { this->proxy_.template reduce(msg,cb); }); } + + if (rollback_ || (iter_ == num_iters_ - 1)) { + // if known, save the best iteration within any trial so we can roll back + if (new_imbalance_ < best_imb && new_imbalance_ <= start_imb) { + best_load = this_new_load_; + best_objs = cur_objs_; + best_imb = new_imbalance_; + best_trial = trial_; + } + if (new_imbalance_ < best_imb_this_trial) { + best_imb_this_trial = new_imbalance_; + } + } } if (this_node == 0) { vt_print( gossiplb, - "GossipLB::doLBStages: trial={} final imb={:0.4f}\n", - trial_, new_imbalance_ + "GossipLB::doLBStages: trial={} {} imb={:0.4f}\n", + trial_, rollback_ ? "best" : "final", best_imb_this_trial ); } - if (new_imbalance_ <= start_imb && new_imbalance_ < best_imb) { - best_load = this_new_load_; - best_objs = cur_objs_; - best_imb = new_imbalance_; - best_trial = trial_; - } - // Clear out for next try or for not migrating by default cur_objs_.clear(); this_new_load_ = this_load; diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index b33d73876c..9cd2881b1f 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -133,6 +133,7 @@ struct GossipLB : BaseLB { uint16_t num_iters_ = 4; uint16_t num_trials_ = 3; bool deterministic_ = false; + bool rollback_ = true; std::random_device seed_; std::unordered_map load_info_ = {}; std::unordered_map new_load_info_ = {}; From e4ae8835ad2f12ec56ed4d4b0a69002f9f105df3 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Wed, 17 Mar 2021 12:58:24 -0700 Subject: [PATCH 24/30] #1279: gossiplb: add option to target long pole load instead of avg --- .../collection/balance/gossiplb/gossiplb.cc | 54 +++++++++++-------- .../collection/balance/gossiplb/gossiplb.h | 2 + 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 01b80e34dd..2c141ec1d6 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -67,19 +67,17 @@ void GossipLB::init(objgroup::proxy::Proxy in_proxy) { } bool GossipLB::isUnderloaded(LoadType load) const { - auto const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); - return load < avg * gossip_threshold; + return load < target_max_load_ * gossip_threshold; } bool GossipLB::isOverloaded(LoadType load) const { - auto const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); - return load > avg * gossip_threshold; + return load > target_max_load_ * gossip_threshold; } void GossipLB::inputParams(balance::SpecEntry* spec) { std::vector allowed{ "f", "k", "i", "c", "trials", "deterministic", "inform", "ordering", "cmf", - "rollback" + "rollback", "targetpole" }; spec->checkAllowedKeys(allowed); @@ -99,6 +97,7 @@ void GossipLB::inputParams(balance::SpecEntry* spec) { num_trials_ = spec->getOrDefault("trials", num_trials_); deterministic_ = spec->getOrDefault("deterministic", deterministic_); rollback_ = spec->getOrDefault("rollback", rollback_); + target_pole_ = spec->getOrDefault("targetpole", target_pole_); int32_t c = spec->getOrDefault("c", default_c); criterion_ = static_cast(c); @@ -124,18 +123,28 @@ void GossipLB::runLB() { auto const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); auto const max = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::max); + auto const pole = stats.at(lb::Statistic::O_l).at(lb::StatisticQuantity::max) * 1000; auto const imb = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::imb); auto const load = this_load; + if (target_pole_) { + // we can't get the processor max lower than the max object load, so + // modify the algorithm to define overloaded as exceeding the max + // object load instead of the processor average load + target_max_load_ = (pole > avg ? pole : avg); + } else { + target_max_load_ = avg; + } + if (avg > 0.0000000001) { - should_lb = max > gossip_tolerance * avg; + should_lb = max > gossip_tolerance * target_max_load_; } if (theContext()->getNode() == 0) { vt_debug_print( terse, gossiplb, - "GossipLB::runLB: avg={}, max={}, load={}, should_lb={}\n", - avg, max, load, should_lb + "GossipLB::runLB: avg={}, max={}, pole={}, imb={}, load={}, should_lb={}\n", + avg, max, pole, imb, load, should_lb ); } @@ -287,8 +296,10 @@ void GossipLB::gossipStatsHandler(StatsMsgType* msg) { vt_debug_print( terse, gossiplb, "GossipLB::gossipStatsHandler: trial={} iter={} max={:0.2f} min={:0.2f} " - "avg={:0.2f} imb={:0.4f}\n", - trial_, iter_, in.max(), in.min(), in.avg(), in.I() + "avg={:0.2f} pole={:0.2f} imb={:0.4f}\n", + trial_, iter_, in.max(), in.min(), in.avg(), + stats.at(lb::Statistic::O_l).at(lb::StatisticQuantity::max) * 1000, + in.I() ); } } @@ -579,14 +590,12 @@ std::vector GossipLB::createCMF(NodeSetType const& under) { return cmf; } - double const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); - double sum_p = 0.0; double factor = 1.0; switch (cmf_type_) { case CMFTypeEnum::Original: - factor = 1.0 / avg; + factor = 1.0 / target_max_load_; break; case CMFTypeEnum::NormBySelf: factor = 1.0 / this_new_load_; @@ -602,7 +611,7 @@ std::vector GossipLB::createCMF(NodeSetType const& under) { l_max = load; } } - factor = 1.0 / (l_max > avg ? l_max : avg); + factor = 1.0 / (l_max > target_max_load_ ? l_max : target_max_load_); } break; default: @@ -686,8 +695,6 @@ GossipLB::selectObject( } void GossipLB::decide() { - double const avg = stats.at(lb::Statistic::P_l).at(lb::StatisticQuantity::avg); - auto lazy_epoch = theTerm()->makeEpochCollective("GossipLB: decide"); int n_transfers = 0, n_rejected = 0; @@ -713,7 +720,7 @@ void GossipLB::decide() { case ObjectOrderEnum::Marginal: { // first find marginal object's load - auto over_avg = this_new_load_ - avg; + auto over_avg = this_new_load_ - target_max_load_; // if no objects are larger than over_avg, then marginal will still // (incorrectly) reflect the total load, which will not be a problem auto marginal = this_new_load_; @@ -791,15 +798,16 @@ void GossipLB::decide() { // The load of the node selected auto& selected_load = load_iter->second; - //auto max_obj_size = avg - selected_load; - - bool eval = Criterion(criterion_)(this_new_load_, selected_load, obj_load_ms, avg); + bool eval = Criterion(criterion_)( + this_new_load_, selected_load, obj_load_ms, target_max_load_ + ); vt_debug_print( verbose, gossiplb, "GossipLB::decide: trial={}, iter={}, under.size()={}, " "selected_node={}, selected_load={:e}, obj_id={:x}, home={}, " - "obj_load_ms={:e}, avg={:e}, this_new_load_={:e}, criterion={}\n", + "obj_load_ms={:e}, target_max_load={:e}, this_new_load_={:e}, " + "criterion={}\n", trial_, iter_, under.size(), @@ -808,7 +816,7 @@ void GossipLB::decide() { obj_id.id, obj_id.home_node, obj_load_ms, - avg, + target_max_load_, this_new_load_, eval ); @@ -829,7 +837,7 @@ void GossipLB::decide() { iter++; } - if (not (this_new_load_ > avg)) { + if (not (this_new_load_ > target_max_load_)) { break; } } diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 9cd2881b1f..833f948caa 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -134,6 +134,7 @@ struct GossipLB : BaseLB { uint16_t num_trials_ = 3; bool deterministic_ = false; bool rollback_ = true; + bool target_pole_ = false; std::random_device seed_; std::unordered_map load_info_ = {}; std::unordered_map new_load_info_ = {}; @@ -146,6 +147,7 @@ struct GossipLB : BaseLB { std::unordered_map cur_objs_ = {}; LoadType this_new_load_ = 0.0; TimeType new_imbalance_ = 0.0; + TimeType target_max_load_ = 0.0; CriterionEnum criterion_ = CriterionEnum::ModifiedGrapevine; InformTypeEnum inform_type_ = InformTypeEnum::SyncInform; ObjectOrderEnum obj_ordering_ = ObjectOrderEnum::Marginal; From f832796b18d19aa868ba27e018dcb50d58e29e4f Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Tue, 23 Mar 2021 15:04:32 -0700 Subject: [PATCH 25/30] #1279: gossiplb: name barrier to prevent hang --- src/vt/vrt/collection/balance/gossiplb/gossiplb.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 2c141ec1d6..62637450c4 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -405,7 +405,8 @@ void GossipLB::informSync() { theSched()->runSchedulerWhile([this]{ return not setup_done_; }); for (; k_cur_ < k_max_; ++k_cur_) { - vt::theCollective()->barrier(); + auto kbarr = theCollective()->newNamedCollectiveBarrier(); + theCollective()->barrier(nullptr, kbarr); auto name = fmt::format("GossipLB: informSync k_cur={}", k_cur_); auto propagate_epoch = theTerm()->makeEpochCollective(name); From 9d72dded7f893cc8810a1359aeeb52c9e0dc0df2 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 26 Mar 2021 09:47:50 -0700 Subject: [PATCH 26/30] #1279: gossiplb: cleanup --- .../collection/balance/gossiplb/gossiplb.cc | 132 ++++++++++-------- .../collection/balance/gossiplb/gossiplb.h | 1 + 2 files changed, 72 insertions(+), 61 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 62637450c4..63485ad4fe 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -263,11 +263,11 @@ void GossipLB::doLBStages(TimeType start_imb) { } if (best_imb <= start_imb) { + // load the configuration with the best imbalance cur_objs_ = best_objs; this_load = this_new_load_ = best_load; new_imbalance_ = best_imb; - // Update the load based on new object assignments if (this_node == 0) { vt_print( gossiplb, @@ -695,6 +695,75 @@ GossipLB::selectObject( } } +std::vector GossipLB::orderObjects() { + // define the iteration order + std::vector ordered_obj_ids(cur_objs_.size()); + + int i = 0; + for (auto &obj : cur_objs_) { + ordered_obj_ids[i++] = obj.first; + } + + switch (obj_ordering_) { + case ObjectOrderEnum::ElmID: + std::sort( + ordered_obj_ids.begin(), ordered_obj_ids.end(), std::less() + ); + break; + case ObjectOrderEnum::Marginal: + { + // first find marginal object's load + auto over_avg = this_new_load_ - target_max_load_; + // if no objects are larger than over_avg, then marginal will still + // (incorrectly) reflect the total load, which will not be a problem + auto marginal = this_new_load_; + for (auto &obj : cur_objs_) { + // the object stats are in seconds but the processor stats are in + // milliseconds; for now, convert the object loads to milliseconds + auto obj_load_ms = loadMilli(obj.second); + if (obj_load_ms > over_avg && obj_load_ms < marginal) { + marginal = obj_load_ms; + } + } + // sort largest to smallest if <= marginal + // sort smallest to largest if > marginal + std::sort( + ordered_obj_ids.begin(), ordered_obj_ids.end(), + [=](const ObjIDType &left, const ObjIDType &right) { + auto left_load = loadMilli(this->cur_objs_[left]); + auto right_load = loadMilli(this->cur_objs_[right]); + if (left_load <= marginal && right_load <= marginal) { + // we're in the sort load descending regime (first section) + return left_load > right_load; + } + // else + // EITHER + // a) both are above the cut, and we're in the sort ascending + // regime (second section), so return left < right + // OR + // b) one is above the cut and one is at or below, and the one + // that is at or below the cut needs to come first, so + // also return left < right + return left_load < right_load; + } + ); + vt_debug_print( + normal, gossiplb, + "GossipLB::decide: over_avg={}, marginal={}\n", + over_avg, loadMilli(cur_objs_[ordered_obj_ids[0]]) + ); + } + break; + case ObjectOrderEnum::Arbitrary: + break; + default: + vtAbort("GossipLB::orderObjects: ordering not supported"); + break; + } + + return ordered_obj_ids; +} + void GossipLB::decide() { auto lazy_epoch = theTerm()->makeEpochCollective("GossipLB: decide"); @@ -705,66 +774,7 @@ void GossipLB::decide() { std::unordered_map migrate_objs; if (under.size() > 0) { - std::vector ordered_obj_ids(cur_objs_.size()); - - // define the iteration order - int i = 0; - for (auto &obj : cur_objs_) { - ordered_obj_ids[i++] = obj.first; - } - switch (obj_ordering_) { - case ObjectOrderEnum::ElmID: - std::sort( - ordered_obj_ids.begin(), ordered_obj_ids.end(), std::less() - ); - break; - case ObjectOrderEnum::Marginal: - { - // first find marginal object's load - auto over_avg = this_new_load_ - target_max_load_; - // if no objects are larger than over_avg, then marginal will still - // (incorrectly) reflect the total load, which will not be a problem - auto marginal = this_new_load_; - for (auto &obj : cur_objs_) { - // the object stats are in seconds but the processor stats are in - // milliseconds; for now, convert the object loads to milliseconds - auto obj_load_ms = loadMilli(obj.second); - if (obj_load_ms > over_avg && obj_load_ms < marginal) { - marginal = obj_load_ms; - } - } - // sort largest to smallest if <= marginal - // sort smallest to largest if > marginal - std::sort( - ordered_obj_ids.begin(), ordered_obj_ids.end(), - [=](const ObjIDType &left, const ObjIDType &right) { - auto left_load = loadMilli(this->cur_objs_[left]); - auto right_load = loadMilli(this->cur_objs_[right]); - if (left_load <= marginal && right_load <= marginal) { - // we're in the sort load descending regime (first section) - return left_load > right_load; - } - // else - // EITHER - // a) both are above the cut, and we're in the sort ascending - // regime (second section), so return left < right - // OR - // b) one is above the cut and one is at or below, and the one - // that is at or below the cut needs to come first, so - // also return left < right - return left_load < right_load; - } - ); - vt_debug_print( - verbose, gossiplb, - "GossipLB::decide: over_avg={}, marginal={}\n", - over_avg, loadMilli(cur_objs_[ordered_obj_ids[0]]) - ); - } - break; - default: - break; - } + std::vector ordered_obj_ids = orderObjects(); // Iterate through all the objects for (auto iter = ordered_obj_ids.begin(); iter != ordered_obj_ids.end(); ) { diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 833f948caa..51f9b816e9 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -112,6 +112,7 @@ struct GossipLB : BaseLB { std::vector createCMF(NodeSetType const& under); NodeType sampleFromCMF(NodeSetType const& under, std::vector const& cmf); std::vector makeUnderloaded() const; + std::vector orderObjects(); ElementLoadType::iterator selectObject( LoadType size, ElementLoadType& load, std::set const& available ); From a0a1c265d70425bb21587a5e2232ea56b5271f83 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 26 Mar 2021 12:09:54 -0700 Subject: [PATCH 27/30] #1279: gossiplb: document lb args --- .../collection/balance/gossiplb/gossiplb.h | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 51f9b816e9..0d29107c53 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -58,22 +58,43 @@ namespace vt { namespace vrt { namespace collection { namespace lb { +// gossiping approach enum struct InformTypeEnum : uint8_t { - // synchronized rounds propagate info faster but have sync cost + // synchronous: round number defined at the processor level; propagates + // after all messages for a round are received, but has sync cost SyncInform = 0, - // async rounds propagate before round has completed, omitting some info + // asynchronous: round number defined at the message level; propagates + // when the first message for a round is received, so has no sync cost AsyncInform = 1 }; +// order in which local objects are considered for transfer enum struct ObjectOrderEnum : uint8_t { + // abitrary: use the unordered_map order Arbitrary = 0, + // element id: ascending by id member of ElementIDStruct ElmID = 1, + // marginal: order by load, starting with the object of marginal load + // (the smallest object that can be transferred to drop the processor + // load below the average), then descending for objects with loads less + // than the marginal load, and finally ascending for objects with loads + // greater than the marginal load Marginal = 2 }; +// how the cmf is computed enum struct CMFTypeEnum : uint8_t { + // original: remove processors from the CMF as soon as they exceed the + // target (e.g., processor-avg) load; use a CMF factor of 1.0/x, where x + // is the target load Original = 0, + // normalize by max: do not remove processors from the CMF that exceed the + // target load until the next iteration; use a CMF factor of 1.0/x, where x + // is the maximum of the target load and the most loaded processor in the CMF NormByMax = 1, + // normalize by self: do not remove processors from the CMF that exceed the + // target load until the next iteration; use a CMF factor of 1.0/x, where x + // is the load of the processor that is computing the CMF NormBySelf = 2 }; @@ -132,9 +153,18 @@ struct GossipLB : BaseLB { uint16_t iter_ = 0; uint16_t trial_ = 0; uint16_t num_iters_ = 4; + // how many times to repeat the requested number of iterations, hoping to + // find a better imbalance (helps if it's easy to get stuck in a local + // minimum) uint16_t num_trials_ = 3; + // whether to make migration choices deterministic, assuming we're operating + // on deterministic loads bool deterministic_ = false; + // whether to roll back to the state from a previous iteration if that + // iteration had a better imbalance than the final one bool rollback_ = true; + // whether to use a target load equal to the maximum object load (the + // "longest pole") when that load exceeds the processor-average load bool target_pole_ = false; std::random_device seed_; std::unordered_map load_info_ = {}; From 4d38c6a6eae620f252c221c52d01b37180fb60b5 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 26 Mar 2021 12:32:36 -0700 Subject: [PATCH 28/30] #1279: gossiplb: fix compile errors --- .../collection/balance/gossiplb/gossip_msg.h | 20 ++++++++++++++++--- .../collection/balance/gossiplb/gossiplb.cc | 1 - 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h index 9e536a46bc..75de33cf3b 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossip_msg.h @@ -159,13 +159,27 @@ struct RejectionStats { int n_transfers_ = 0; }; -struct GossipRejectionStatsMsg : collective::ReduceTMsg { +static_assert( + vt::messaging::is_byte_copyable_t::value, + "Must be trivially copyable to avoid serialization." +); + +struct GossipRejectionStatsMsg : NonSerialized< + collective::ReduceTMsg, + GossipRejectionStatsMsg +> +{ + using MessageParentType = NonSerialized< + collective::ReduceTMsg, + GossipRejectionStatsMsg + >; + GossipRejectionStatsMsg() = default; GossipRejectionStatsMsg(int n_rejected, int n_transfers) - : ReduceTMsg(RejectionStats(n_rejected, n_transfers)) + : MessageParentType(RejectionStats(n_rejected, n_transfers)) { } GossipRejectionStatsMsg(RejectionStats&& rs) - : ReduceTMsg(std::move(rs)) + : MessageParentType(std::move(rs)) { } }; diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 63485ad4fe..1e9f4b875b 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -224,7 +224,6 @@ void GossipLB::doLBStages(TimeType start_imb) { if (rollback_ || theConfig()->vt_debug_gossiplb || (iter_ == num_iters_ - 1)) { runInEpochCollective([=] { - using StatsMsgType = balance::NodeStatsMsg; using ReduceOp = collective::PlusOp; auto cb = vt::theCB()->makeBcast< GossipLB, StatsMsgType, &GossipLB::gossipStatsHandler From da2e611c5bde97319799af92de9b787f934607f3 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 26 Mar 2021 12:50:40 -0700 Subject: [PATCH 29/30] #1279: gossiplb: clean up sync round number --- .../collection/balance/gossiplb/gossiplb.cc | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc index 1e9f4b875b..a97152ea16 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.cc @@ -166,7 +166,6 @@ void GossipLB::doLBStages(TimeType start_imb) { selected_.clear(); underloaded_.clear(); load_info_.clear(); - k_cur_ = 0; is_overloaded_ = is_underloaded_ = false; TimeType best_imb_this_trial = start_imb + 10; @@ -192,7 +191,6 @@ void GossipLB::doLBStages(TimeType start_imb) { selected_.clear(); underloaded_.clear(); load_info_.clear(); - k_cur_ = 0; is_overloaded_ = is_underloaded_ = false; } @@ -324,13 +322,12 @@ void GossipLB::gossipRejectionStatsHandler(GossipRejectionMsgType* msg) { void GossipLB::informAsync() { propagated_k_.assign(k_max_, false); - uint8_t k_cur_async = 0; vt_debug_print( normal, gossiplb, "GossipLB::informAsync: starting inform phase: trial={}, iter={}, " - "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", - trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ + "k_max={}, is_underloaded={}, is_overloaded={}, load={}\n", + trial_, iter_, k_max_, is_underloaded_, is_overloaded_, this_new_load_ ); vtAssert(k_max_ > 0, "Number of rounds (k) must be greater than zero"); @@ -352,6 +349,7 @@ void GossipLB::informAsync() { // Underloaded start the round if (is_underloaded_) { + uint8_t k_cur_async = 0; propagateRound(k_cur_async, false, propagate_epoch); } @@ -370,8 +368,8 @@ void GossipLB::informAsync() { vt_debug_print( verbose, gossiplb, "GossipLB::informAsync: finished inform phase: trial={}, iter={}, " - "k_max={}, k_cur={}\n", - trial_, iter_, k_max_, k_cur_ + "k_max={}\n", + trial_, iter_, k_max_ ); } @@ -379,8 +377,8 @@ void GossipLB::informSync() { vt_debug_print( normal, gossiplb, "GossipLB::informSync: starting inform phase: trial={}, iter={}, " - "k_max={}, k_cur={}, is_underloaded={}, is_overloaded={}, load={}\n", - trial_, iter_, k_max_, k_cur_, is_underloaded_, is_overloaded_, this_new_load_ + "k_max={}, is_underloaded={}, is_overloaded={}, load={}\n", + trial_, iter_, k_max_, is_underloaded_, is_overloaded_, this_new_load_ ); vtAssert(k_max_ > 0, "Number of rounds (k) must be greater than zero"); @@ -403,7 +401,7 @@ void GossipLB::informSync() { theSched()->runSchedulerWhile([this]{ return not setup_done_; }); - for (; k_cur_ < k_max_; ++k_cur_) { + for (k_cur_ = 0; k_cur_ < k_max_; ++k_cur_) { auto kbarr = theCollective()->newNamedCollectiveBarrier(); theCollective()->barrier(nullptr, kbarr); @@ -531,7 +529,7 @@ void GossipLB::propagateIncomingAsync(GossipMsgAsync* msg) { normal, gossiplb, "GossipLB::propagateIncomingAsync: trial={}, iter={}, k_max={}, " "k_cur={}, from_node={}, load info size={}\n", - trial_, iter_, k_max_, k_cur_, from_node, msg->getNodeLoad().size() + trial_, iter_, k_max_, k_cur_async, from_node, msg->getNodeLoad().size() ); for (auto&& elm : msg->getNodeLoad()) { From 242282cffaf0faa690b9fd4092ba75c5da8576a9 Mon Sep 17 00:00:00 2001 From: Nicole Lemaster Slattengren Date: Fri, 9 Apr 2021 11:50:11 -0700 Subject: [PATCH 30/30] #1279: gossiplb: convert comments to doxygen --- .../collection/balance/gossiplb/gossiplb.h | 111 ++++++++++++------ 1 file changed, 78 insertions(+), 33 deletions(-) diff --git a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h index 0d29107c53..6c04da8ccd 100644 --- a/src/vt/vrt/collection/balance/gossiplb/gossiplb.h +++ b/src/vt/vrt/collection/balance/gossiplb/gossiplb.h @@ -58,43 +58,71 @@ namespace vt { namespace vrt { namespace collection { namespace lb { -// gossiping approach +/// Enum for gossiping approach enum struct InformTypeEnum : uint8_t { - // synchronous: round number defined at the processor level; propagates - // after all messages for a round are received, but has sync cost + /** + * \brief Synchronous sharing of underloaded processor loads + * + * The round number is defined at the processor level. This approach + * propagates known loads after all messages for a round are received, + * maximizing the amount of information propagated per round, but has a + * synchronization cost. + */ SyncInform = 0, - // asynchronous: round number defined at the message level; propagates - // when the first message for a round is received, so has no sync cost + /** + * \brief Asynchronous sharing of underloaded processor loads + * + * The round number is defined at the message level. This approach + * propagates known loads when the first message for a round is received, + * avoiding the synchronization cost but delaying the propagation of some + * information until the following round. + */ AsyncInform = 1 }; -// order in which local objects are considered for transfer +/// Enum for the order in which local objects are considered for transfer enum struct ObjectOrderEnum : uint8_t { - // abitrary: use the unordered_map order - Arbitrary = 0, - // element id: ascending by id member of ElementIDStruct + Arbitrary = 0, //< Arbitrary order: iterate as defined by the unordered_map + /** + * \brief By element ID + * + * Sort ascending by the ID member of ElementIDStruct. + */ ElmID = 1, - // marginal: order by load, starting with the object of marginal load - // (the smallest object that can be transferred to drop the processor - // load below the average), then descending for objects with loads less - // than the marginal load, and finally ascending for objects with loads - // greater than the marginal load + /** + * \brief Order for the least migrations + * + * Order by load, starting with the smallest object that can be transferred + * to drop the processor load below the average, then descending for objects + * with smaller loads, and finally ascending for objects with larger loads. + */ Marginal = 2 }; -// how the cmf is computed +/// Enum for how the CMF is computed enum struct CMFTypeEnum : uint8_t { - // original: remove processors from the CMF as soon as they exceed the - // target (e.g., processor-avg) load; use a CMF factor of 1.0/x, where x - // is the target load + /** + * \brief Original approach + * + * Remove processors from the CMF as soon as they exceed the target (e.g., + * processor-avg) load. Use a CMF factor of 1.0/x, where x is the target load. + */ Original = 0, - // normalize by max: do not remove processors from the CMF that exceed the - // target load until the next iteration; use a CMF factor of 1.0/x, where x - // is the maximum of the target load and the most loaded processor in the CMF + /** + * \brief Compute the CMF factor using the largest processor load in the CMF + * + * Do not remove processors from the CMF that exceed the target load until the + * next iteration. Use a CMF factor of 1.0/x, where x is the greater of the + * target load and the load of the most loaded processor in the CMF. + */ NormByMax = 1, - // normalize by self: do not remove processors from the CMF that exceed the - // target load until the next iteration; use a CMF factor of 1.0/x, where x - // is the load of the processor that is computing the CMF + /** + * \brief Compute the CMF factor using the load of this processor + * + * Do not remove processors from the CMF that exceed the target load until the + * next iteration. Use a CMF factor of 1.0/x, where x is the load of the + * processor that is computing the CMF. + */ NormBySelf = 2 }; @@ -153,18 +181,35 @@ struct GossipLB : BaseLB { uint16_t iter_ = 0; uint16_t trial_ = 0; uint16_t num_iters_ = 4; - // how many times to repeat the requested number of iterations, hoping to - // find a better imbalance (helps if it's easy to get stuck in a local - // minimum) + /** + * \brief Number of trials + * + * How many times to repeat the requested number of iterations, hoping to find + * a better imbalance. This helps if it's easy to get stuck in a local minimum. + */ uint16_t num_trials_ = 3; - // whether to make migration choices deterministic, assuming we're operating - // on deterministic loads + /** + * \brief Whether to make migration choices deterministic + * + * This will only lead to reproducibility when paired with deterministic + * object loads, for example when using a driver that feeds the load balancer + * object loads read from vt stats files. + */ bool deterministic_ = false; - // whether to roll back to the state from a previous iteration if that - // iteration had a better imbalance than the final one + /** + * \brief Whether to roll back to the best iteration + * + * If the final iteration of a trial has a worse imbalance than any earier + * iteration, it will roll back to the best iteration. + */ bool rollback_ = true; - // whether to use a target load equal to the maximum object load (the - // "longest pole") when that load exceeds the processor-average load + /** + * \brief Whether to adjust the target load when we have a long pole + * + * When an object load exceeds the processor-average load (i.e., we have a + * "long pole"), adjust the target load to be the maximum object load + * ("longest pole") instead of the processor-average load. + */ bool target_pole_ = false; std::random_device seed_; std::unordered_map load_info_ = {};