diff --git a/CMakeLists.txt b/CMakeLists.txt index a141f79cdf..7c1e8e55c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,7 @@ set(PROJECT_BIN_DIR ${CMAKE_CURRENT_BINARY_DIR}) set(PROJECT_BASE_DIR ${CMAKE_CURRENT_SOURCE_DIR}) set(PROJECT_LIB_DIR ${CMAKE_CURRENT_SOURCE_DIR}/lib) set(PROJECT_EXAMPLE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/examples) +set(PROJECT_TOOLS_DIR ${CMAKE_CURRENT_SOURCE_DIR}/tools) # Import the linking macros for VT-related targets include(cmake/link_vt.cmake) @@ -114,6 +115,25 @@ if (VT_BUILD_TESTS include(CTest) endif() +# +# Tools +# +option(vt_build_tools "Build VT tools" ON) + +if (vt_build_tools) + message( + STATUS + "VT: building tools" + ) + + add_custom_target(tools) + add_subdirectory(tools) +else() + message( + STATUS "VT: NOT building tools because vt_build_tools is not set." + ) +endif() + # # Examples # diff --git a/scripts/check_license.sh b/scripts/check_license.sh index 2c573ad9c0..8ab58bad59 100755 --- a/scripts/check_license.sh +++ b/scripts/check_license.sh @@ -3,7 +3,7 @@ path_to_vt=${1} cd "$path_to_vt" || exit 1 -for sub_dir in "src" "tests/unit" "tests/perf" "tutorial" "examples" +for sub_dir in "src" "tests/unit" "tests/perf" "tutorial" "examples" "tools" do "$path_to_vt/scripts/add-license-perl.pl" "$path_to_vt/$sub_dir" "$path_to_vt/scripts/license-template" done diff --git a/src/vt/configs/arguments/app_config.h b/src/vt/configs/arguments/app_config.h index 5572500a5e..bdabfed535 100644 --- a/src/vt/configs/arguments/app_config.h +++ b/src/vt/configs/arguments/app_config.h @@ -225,6 +225,7 @@ struct AppConfig { bool vt_debug_phase = false; bool vt_debug_context = false; bool vt_debug_epoch = false; + bool vt_debug_replay = false; bool vt_debug_print_flush = false; @@ -386,6 +387,7 @@ struct AppConfig { | vt_debug_phase | vt_debug_context | vt_debug_epoch + | vt_debug_replay | vt_debug_print_flush diff --git a/src/vt/configs/arguments/args.cc b/src/vt/configs/arguments/args.cc index 6cb0a03345..628f4fd88f 100644 --- a/src/vt/configs/arguments/args.cc +++ b/src/vt/configs/arguments/args.cc @@ -374,6 +374,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) { auto dcp = "Enable debug_phase = \"" debug_pp(phase) "\""; auto ddp = "Enable debug_context = \"" debug_pp(context) "\""; auto dep = "Enable debug_epoch = \"" debug_pp(epoch) "\""; + auto dfp = "Enable debug_replay = \"" debug_pp(replay) "\""; auto r1 = app.add_option("--vt_debug_level", appConfig.vt_debug_level, rq); @@ -410,6 +411,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) { auto dc = app.add_flag("--vt_debug_phase", appConfig.vt_debug_phase, dcp); auto dd = app.add_flag("--vt_debug_context", appConfig.vt_debug_context, ddp); auto de = app.add_flag("--vt_debug_epoch", appConfig.vt_debug_epoch, dep); + auto df = app.add_flag("--vt_debug_replay", appConfig.vt_debug_replay, dfp); auto debugGroup = "Debug Print Configuration (must be compile-time enabled)"; r->group(debugGroup); @@ -446,6 +448,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) { dc->group(debugGroup); dd->group(debugGroup); de->group(debugGroup); + df->group(debugGroup); auto dbq = "Always flush VT runtime prints"; auto eb = app.add_flag("--vt_debug_print_flush", appConfig.vt_debug_print_flush, dbq); diff --git a/src/vt/configs/debug/debug_config.h b/src/vt/configs/debug/debug_config.h index 999586b1fc..bde85b0620 100644 --- a/src/vt/configs/debug/debug_config.h +++ b/src/vt/configs/debug/debug_config.h @@ -80,7 +80,8 @@ enum CatEnum : uint64_t { phase = 1ull<<28, context = 1ull<<29, epoch = 1ull<<30, - temperedwmin = 1ull<<31 + temperedwmin = 1ull<<31, + replay = 1ull<<32 }; enum CtxEnum : uint64_t { @@ -138,6 +139,7 @@ vt_option_category_pretty_print(reduce, "reduce") vt_option_category_pretty_print(rdma, "RDMA") vt_option_category_pretty_print(rdma_channel, "RDMA Channel") vt_option_category_pretty_print(rdma_state, "RDMA State") +vt_option_category_pretty_print(replay, "replay") vt_option_category_pretty_print(runtime, "runtime") vt_option_category_pretty_print(scatter, "scatter") vt_option_category_pretty_print(serial_msg, "serialized-msg") diff --git a/src/vt/phase/phase_manager.cc b/src/vt/phase/phase_manager.cc index 99089d10c2..cdb56585d3 100644 --- a/src/vt/phase/phase_manager.cc +++ b/src/vt/phase/phase_manager.cc @@ -301,7 +301,7 @@ void PhaseManager::printSummary(vrt::collection::lb::PhaseInfo* last_phase_info) phase, "phase={}, duration={}, rank_max_compute_time={}, rank_avg_compute_time={}, imbalance={:.3f}, " "grain_max_time={}, migration count={}, lb_name={}\n", - cur_phase_, + last_phase_info->phase, total_time, TimeType(last_phase_info->max_load), TimeType(last_phase_info->avg_load), @@ -313,7 +313,7 @@ void PhaseManager::printSummary(vrt::collection::lb::PhaseInfo* last_phase_info) // vt_print( // phase, // "POST phase={}, total time={}, max_load={}, avg_load={}, imbalance={:.3f}, migration count={}\n", - // cur_phase_, + // last_phase_info->phase, // total_time, // TimeType(last_phase_info->max_load_post_lb), // TimeType(last_phase_info->avg_load_post_lb), @@ -336,7 +336,7 @@ void PhaseManager::printSummary(vrt::collection::lb::PhaseInfo* last_phase_info) auto percent_improvement = compute_percent_improvement( last_phase_info->max_load, last_phase_info->avg_load ); - if (percent_improvement > 3.0 and cur_phase_ > 0) { + if (percent_improvement > 3.0 and last_phase_info->phase > 0) { if (grain_percent_improvement < 0.5) { // grain size is blocking improvement vt_print( @@ -395,7 +395,7 @@ void PhaseManager::printSummary(vrt::collection::lb::PhaseInfo* last_phase_info) } } } - } else if (cur_phase_ == 0) { + } else if (last_phase_info->phase == 0) { // ran the lb on a phase that may have included initialization costs vt_print( phase, diff --git a/src/vt/runtime/runtime_banner.cc b/src/vt/runtime/runtime_banner.cc index ee02c32387..260417fd86 100644 --- a/src/vt/runtime/runtime_banner.cc +++ b/src/vt/runtime/runtime_banner.cc @@ -904,6 +904,7 @@ void Runtime::printStartupBanner() { vt_runtime_debug_warn_compile(phase) vt_runtime_debug_warn_compile(context) vt_runtime_debug_warn_compile(epoch) + vt_runtime_debug_warn_compile(replay) auto arg_str = [](std::vector const& args) -> std::string { std::stringstream ss; diff --git a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.h b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.h index a7c8257d73..020a689b03 100644 --- a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.h +++ b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.h @@ -261,6 +261,10 @@ struct LBManager : runtime::component::Component { void statsHandler(std::vector const& in_stat_vec); + lb::PhaseInfo *getPhaseInfo() { return last_phase_info_.get(); } + + void setComputingBeforeLBStats(bool before_lb) { before_lb_stats_ = before_lb; } + private: bool isCollectiveComm(elm::CommCategory cat) const; diff --git a/src/vt/vrt/collection/balance/workload_replay.cc b/src/vt/vrt/collection/balance/workload_replay.cc new file mode 100644 index 0000000000..ebe5e072bd --- /dev/null +++ b/src/vt/vrt/collection/balance/workload_replay.cc @@ -0,0 +1,459 @@ +/* +//@HEADER +// ***************************************************************************** +// +// workload_replay.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include "vt/config.h" +#include "vt/vrt/collection/balance/workload_replay.h" +#include "vt/vrt/collection/balance/lb_data_holder.h" +#include "vt/vrt/collection/balance/lb_invoke/lb_manager.h" +#include "vt/phase/phase_manager.h" +#include "vt/utils/json/json_reader.h" + +#include + +#include + +namespace vt { namespace vrt { namespace collection { +namespace balance { namespace replay { + +void replayWorkloads( + PhaseType initial_phase, PhaseType phases_to_run +) { + // read in object loads from json files + auto const filename = theConfig()->getLBDataFileIn(); + auto workloads = readInWorkloads(filename); + + // use the default stats handler + auto stats_cb = vt::theCB()->makeBcast< + &LBManager::statsHandler + >(theLBManager()->getProxy()); + + replayWorkloads(initial_phase, phases_to_run, workloads, stats_cb); +} + +void replayWorkloads( + PhaseType initial_phase, PhaseType phases_to_run, + std::shared_ptr workloads, + Callback> stats_cb +) { + using ObjIDType = elm::ElementIDStruct; + + auto const this_rank = theContext()->getNode(); + + // remember vt's base load model + auto base_load_model = theLBManager()->getBaseLoadModel(); + // force it to use our given workloads, not anything it may have collected + base_load_model->setLoads(&(workloads->node_data_), &(workloads->node_comm_)); + // point the load model at the workloads for the relevant phase + runInEpochCollective("WorkloadReplayDriver -> updateLoads", [=] { + base_load_model->updateLoads(initial_phase); + }); + + // allow remembering what objects are here after the load balancer migrates + std::set migratable_objects_here; + for (auto workload_id : *base_load_model) { + if (workload_id.isMigratable()) { + migratable_objects_here.insert(workload_id); + } + } + + // simulate the given number of phases + auto stop_phase = initial_phase + phases_to_run; + for (PhaseType phase = initial_phase; phase < stop_phase; phase++) { + // reapply the base load model if in case we overwrote it on a previous iter + theLBManager()->setLoadModel(base_load_model); + + // force it to use our given workloads, not anything it may have collected + base_load_model->setLoads( + &(workloads->node_data_), &(workloads->node_comm_) + ); + + // point the load model at the workloads for the relevant phase + runInEpochCollective("WorkloadReplayDriver -> updateLoads", [=] { + base_load_model->updateLoads(phase); + }); + + if (theConfig()->vt_debug_replay) { + size_t count = 0; + for (auto workload_id : *base_load_model) { + if (workload_id.isMigratable()) { + ++count; + vt_debug_print( + normal, replay, + "workload for element {} is here on phase {}\n", workload_id, phase + ); + } + } + vt_debug_print( + terse, replay, + "Number of known workloads: {}\n", count + ); + } + + auto pre_lb_load_model = base_load_model; + + // if this isn't the initial phase, then the workload may exist on a rank + // other than where the objects are currently meant to exist; we will + // use a Reassignment object to get those workloads where they need to be + if (phase > initial_phase) { + if (this_rank == 0) { + vt_print( + replay, "Migrating object workloads to phase {} ranks...\n", phase + ); + } + + // get the workloads to the ranks where the objects currently exist + pre_lb_load_model = WorkloadDataMigrator::relocateWorkloadsForReplay( + base_load_model, migratable_objects_here + ); + + // update the load model that will be used by the real load balancer + theLBManager()->setLoadModel(pre_lb_load_model); + + // force it to use our given workloads, not anything it may have collected + pre_lb_load_model->setLoads( + &(workloads->node_data_), &(workloads->node_comm_) + ); + } + + if (this_rank == 0) { + vt_print(replay, "Simulating phase {}...\n", phase); + } + + if (theConfig()->vt_debug_replay) { + size_t count = 0; + for (auto workload_id : *pre_lb_load_model) { + if (workload_id.isMigratable()) { + ++count; + vt_debug_print( + normal, replay, + "element {} is here on phase {} before LB\n", workload_id, phase + ); + } + } + vt_debug_print( + terse, replay, + "Number of objects before LB: {}\n", count + ); + } + + vt_debug_print( + terse, replay, + "constructing load model from real load balancer\n" + ); + + runInEpochCollective("WorkloadReplayDriver -> runRealLB", [&] { + // run the load balancer but don't let it automatically migrate; + // instead, remember where the LB wanted to migrate objects + + std::shared_ptr proposed_model = nullptr; + auto postLBWork = [&](ReassignmentMsg *msg) { + auto lb_reassignment = msg->reassignment; + if (lb_reassignment) { + proposed_model = std::make_shared( + pre_lb_load_model, + WorkloadDataMigrator::updateCurrentNodes(lb_reassignment) + ); + migratable_objects_here.clear(); + for (auto it = proposed_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + migratable_objects_here.insert(*it); + vt_debug_print( + normal, replay, + "element {} is here on phase {} after LB\n", *it, phase + ); + } + } + auto last_phase_info = theLBManager()->getPhaseInfo(); + last_phase_info->migration_count = lb_reassignment->global_migration_count; + last_phase_info->ran_lb = true; + last_phase_info->phase = phase; + } + vt_debug_print( + terse, replay, + "Number of objects after LB: {}\n", migratable_objects_here.size() + ); + runInEpochCollective("postLBWorkForReplay -> computeStats", [=] { + theLBManager()->setComputingBeforeLBStats(false); + theLBManager()->computeStatistics( + proposed_model, false, phase, stats_cb + ); + }); + }; + auto cb = theCB()->makeFunc( + vt::pipe::LifetimeEnum::Once, postLBWork + ); + theLBManager()->selectStartLB(phase, cb); + }); + runInEpochCollective("WorkloadReplayDriver -> destroyLB", [&] { + theLBManager()->destroyLB(); + }); + auto last_phase_info = theLBManager()->getPhaseInfo(); + thePhase()->printSummary(last_phase_info); + } +} + +std::shared_ptr +readInWorkloads(const std::string &filename) { + using util::json::Reader; + + Reader r{filename}; + auto json = r.readFile(); + auto sd = std::make_shared(*json); + + for (auto &phase_data : sd->node_data_) { + vt_debug_print( + normal, replay, + "found {} loads for phase {}\n", + phase_data.second.size(), phase_data.first + ); + } + + for (auto &phase_data : sd->node_comm_) { + vt_debug_print( + normal, replay, + "found {} comms for phase {}\n", + phase_data.second.size(), phase_data.first + ); + } + + return sd; +} + + +/*static*/ +objgroup::proxy::Proxy +WorkloadDataMigrator::construct(std::shared_ptr model_base) { + auto my_proxy = theObjGroup()->makeCollective( + "WorkloadDataMigrator" + ); + auto strat = my_proxy.get(); + auto base_proxy = my_proxy.template castToBase(); + vt_debug_print( + verbose, replay, + "WorkloadDataMigrator proxy={} base_proxy={}\n", + my_proxy.getProxy(), base_proxy.getProxy() + ); + strat->proxy_ = base_proxy; + strat->load_model_ = model_base.get(); + return my_proxy; +} + +void WorkloadDataMigrator::runLB(LoadType) { } + +void WorkloadDataMigrator::inputParams(ConfigEntry* spec) { } + +std::unordered_map +WorkloadDataMigrator::getInputKeysWithHelp() { + std::unordered_map const keys_help; + return keys_help; +} + +/*static*/ +std::shared_ptr +WorkloadDataMigrator::updateCurrentNodes( + std::shared_ptr lb_reassignment +) { + auto modified_reassignment = std::make_shared(); + modified_reassignment->node_ = lb_reassignment->node_; + modified_reassignment->global_migration_count = + lb_reassignment->global_migration_count; + for (auto &dep : lb_reassignment->depart_) { + ObjIDType id = dep.first; + NodeType dest = dep.second; + id.curr_node = dest; + modified_reassignment->depart_[id] = dest; + } + auto const this_rank = vt::theContext()->getNode(); + for (auto &arr : lb_reassignment->arrive_) { + ObjIDType id = arr.first; + id.curr_node = this_rank; + modified_reassignment->arrive_[id] = arr.second; + } + return modified_reassignment; +} + +/*static*/ +std::shared_ptr +WorkloadDataMigrator::relocateWorkloadsForReplay( + std::shared_ptr model_base, + std::set migratable_objects_here +) { + // Object workloads may exist on arbitrary ranks instead of being colocated + // with the objects themselves. Relocate the workloads to where the objects + // themselves exist. Do this by first migrating home all workloads that are + // neither at home nor colocated with the object. Finally, migrate from home + // all workloads not already colocated with the object. + + std::shared_ptr move_home_model = + relocateMisplacedWorkloadsHome(model_base, migratable_objects_here); + + std::shared_ptr move_here_model = + relocateMisplacedWorkloadsHere(move_home_model, migratable_objects_here); + + return move_here_model; +} + +/*static*/ +std::shared_ptr +WorkloadDataMigrator::relocateMisplacedWorkloadsHome( + std::shared_ptr model_base, + std::set migratable_objects_here +) { + std::shared_ptr move_home_model = nullptr; + + runInEpochCollective("WorkloadDataMigrator -> migrateLBDataHome", [&] { + auto norm_lb_proxy = WorkloadDataMigrator::construct(model_base); + auto normalizer = norm_lb_proxy.get(); + move_home_model = normalizer->createModelToMoveWorkloadsHome( + model_base, migratable_objects_here + ); + norm_lb_proxy.destroyCollective(); + }); + + return move_home_model; +} + +/*static*/ +std::shared_ptr +WorkloadDataMigrator::relocateMisplacedWorkloadsHere( + std::shared_ptr model_base, + std::set migratable_objects_here +) { + std::shared_ptr move_here_model = nullptr; + + runInEpochCollective("WorkloadDataMigrator -> migrateLBDataHere", [&] { + auto norm_lb_proxy = WorkloadDataMigrator::construct(model_base); + auto normalizer = norm_lb_proxy.get(); + move_here_model = normalizer->createModelToMoveWorkloadsHere( + model_base, migratable_objects_here + ); + norm_lb_proxy.destroyCollective(); + }); + + return move_here_model; +} + +std::shared_ptr +WorkloadDataMigrator::createModelToMoveWorkloadsHome( + std::shared_ptr model_base, + std::set migratable_objects_here +) { + auto const this_rank = vt::theContext()->getNode(); + vt_debug_print( + terse, replay, + "constructing load model to get loads from file location to home\n" + ); + + runInEpochCollective("WorkloadDataMigrator -> transferLBDataHome", [&] { + for (auto workload_id : *model_base) { + if (workload_id.isMigratable()) { + // if the object belongs here, do nothing; otherwise, "transfer" it to + // the home rank so that it can later be sent to the rank holding the + // object + if (workload_id.getHomeNode() != this_rank) { + if (migratable_objects_here.count(workload_id) == 0) { + vt_debug_print( + verbose, replay, + "will transfer load of {} home to {}\n", + workload_id, workload_id.getHomeNode() + ); + migrateObjectTo(workload_id, workload_id.getHomeNode()); + } + } + } + } + }); + + auto tmp_assignment = normalizeReassignments(); + auto home_assignment = updateCurrentNodes(tmp_assignment); + return std::make_shared(model_base, home_assignment); +} + +std::shared_ptr +WorkloadDataMigrator::createModelToMoveWorkloadsHere( + std::shared_ptr model_base, + std::set migratable_objects_here +) { + auto const this_rank = vt::theContext()->getNode(); + vt_debug_print( + terse, replay, + "constructing load model to get loads from home to here\n" + ); + + runInEpochCollective("WorkloadDataMigrator -> transferLBDataHere", [&] { + for (auto workload_id : migratable_objects_here) { + // if the object is already here, do nothing; otherwise, "transfer" it + // from the home rank so that we will have the needed workload data + bool workloads_here = false; + for (auto other_id : *model_base) { + if (workload_id == other_id) { + workloads_here = true; + break; + } + } + if (!workloads_here) { + // check that this isn't something that should already have been here + assert(workload_id.getHomeNode() != this_rank); + + vt_debug_print( + verbose, replay, + "will transfer load of {} from home {}\n", + workload_id, workload_id.getHomeNode() + ); + ObjIDType mod_id = workload_id; + // Override curr_node to force retrieval from the home rank + mod_id.curr_node = workload_id.getHomeNode(); + migrateObjectTo(mod_id, this_rank); + } + } + }); + + auto tmp_assignment = normalizeReassignments(); + // now restore the curr_node values to reflect the placement of the "real" + // object + auto here_assignment = updateCurrentNodes(tmp_assignment); + + return std::make_shared(model_base, here_assignment); +} + +}}}}} /* end namespace vt::vrt::collection::balance::replay */ diff --git a/src/vt/vrt/collection/balance/workload_replay.h b/src/vt/vrt/collection/balance/workload_replay.h new file mode 100644 index 0000000000..2e36b39231 --- /dev/null +++ b/src/vt/vrt/collection/balance/workload_replay.h @@ -0,0 +1,231 @@ +/* +//@HEADER +// ***************************************************************************** +// +// workload_replay.h +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#if !defined INCLUDED_VT_VRT_COLLECTION_BALANCE_WORKLOAD_REPLAY_H +#define INCLUDED_VT_VRT_COLLECTION_BALANCE_WORKLOAD_REPLAY_H + +#include "vt/config.h" +#include "vt/elm/elm_id.h" +#include "vt/vrt/collection/balance/stats_msg.h" +#include "vt/vrt/collection/balance/lb_data_holder.h" +#include "vt/vrt/collection/balance/baselb/baselb.h" +#include "vt/vrt/collection/balance/model/load_model.h" +#include "vt/vrt/collection/balance/model/proposed_reassignment.h" + +#include +#include +#include + +namespace vt { namespace vrt { namespace collection { +namespace balance { namespace replay { + +/** + * \brief Simulate replaying the object workloads as recorded in the json file, + * but allow new load balancing decisions to be made. + * + * \param[in] initial_phase the first phase to replay + * \param[in] phases_to_run how many phases to replay + * + * The json files specified by the command-line arguments --vt_lb_data_file_in + * and --vt_lb_data_dir_in will be imported and the LB data contained within + * will be fed through the load balancer(s) specified on the vt command-line + * on each requested phase, allowing new load balancing decisions to happen. + * There is no requirement to colocate the LB data on the same rank as the + * object exists during any given phase. + */ +void replayWorkloads( + PhaseType initial_phase, PhaseType phases_to_run +); + +/** + * \brief Simulate replaying the object workloads passed in, but allow new load + * balancing decisions to be made. + * + * \param[in] initial_phase the first phase to replay + * \param[in] phases_to_run how many phases to replay + * \param[in] workloads the workload data to simulate + * \param[in] stats_cb callback for post-lb statistics + * + * LB data passed in will be fed through the load balancer(s) specified on the + * vt command-line on each requested phase, allowing new load balancing + * decisions to happen. There is no requirement to colocate the LB data on the + * same rank as the object exists during any given phase. + */ +void replayWorkloads( + PhaseType initial_phase, PhaseType phases_to_run, + std::shared_ptr workloads, + Callback> stats_cb +); + +/** + * \brief Build a LBDataHolder object from the LB data in a json file + * + * \param[in] filename read in LB data from the specified json file + * + * \return the LBDataHolder object built from the LB data + */ +std::shared_ptr +readInWorkloads(const std::string &filename); + + +/** + * \struct WorkloadDataMigrator + * + * \brief A helper objgroup for workload replay. Derives from + * \c vt::Vrt::collection::lb::BaseLB in order to gain access to + * normalizeReassignments but is not a load balancer in the traditional sense. + * A new instance should be created for each call to normalizeReassignments. + */ +struct WorkloadDataMigrator : lb::BaseLB { + + using ObjIDType = elm::ElementIDStruct; + + WorkloadDataMigrator() = default; + + /** + * \brief Construct an objgroup and configure it + * + * \param[in] model_base the load model that reflects the known workloads + * + * \return the objgroup proxy to use for exchanging workload information + */ + static objgroup::proxy::Proxy + construct(std::shared_ptr model_base); + + void runLB(LoadType) override; + + void inputParams(ConfigEntry* spec) override; + + static std::unordered_map getInputKeysWithHelp(); + + using BaseLB::normalizeReassignments; + + /** + * \brief Update the current locations of objects so that ProposedReassignment + * load models can be composed + * + * \param[in] lb_reassignment the Reassignment returned by a load balancer + * + * \return a new Reassignment that reflects the updated locations of objects + */ + static std::shared_ptr + updateCurrentNodes( + std::shared_ptr lb_reassignment + ); + + /** + * \brief Relocate object workloads to the rank where the objects are supposed + * to exist during this phase + * + * \param[in] model_base the load model for the phase we are simulating + * \param[in] migratable_objects_here migratable objects here on this phase + * + * \return load model that makes the necessary object workloads available + */ + static std::shared_ptr + relocateWorkloadsForReplay( + std::shared_ptr model_base, + std::set migratable_objects_here + ); + + /** + * \brief Instantiate objgroup and relocate applicable object workloads home + * + * \param[in] model_base the load model for the phase we are simulating + * \param[in] migratable_objects_here migratable objects here on this phase + * + * \return load model that makes the necessary object workloads available + */ + static std::shared_ptr + relocateMisplacedWorkloadsHome( + std::shared_ptr model_base, + std::set migratable_objects_here + ); + + /** + * \brief Instantiate objgroup and relocate applicable workloads here + * + * \param[in] model_base the load model for the phase we are simulating + * \param[in] migratable_objects_here migratable objects here on this phase + * + * \return load model that makes the necessary object workloads available + */ + static std::shared_ptr + relocateMisplacedWorkloadsHere( + std::shared_ptr model_base, + std::set migratable_objects_here + ); + +private: + /** + * \brief Relocate object workloads home if the object is not on this rank + * + * \param[in] model_base the load model for the phase we are simulating + * \param[in] migratable_objects_here migratable objects here on this phase + * + * \return load model that makes the necessary object workloads available + */ + std::shared_ptr + createModelToMoveWorkloadsHome( + std::shared_ptr model_base, + std::set migratable_objects_here + ); + + /** + * \brief Relocate workloads here for objects on this rank + * + * \param[in] model_base the load model for the phase we are simulating + * \param[in] migratable_objects_here migratable objects here on this phase + * + * \return load model that makes the necessary object workloads available + */ + std::shared_ptr + createModelToMoveWorkloadsHere( + std::shared_ptr model_base, + std::set migratable_objects_here + ); +}; + +}}}}} /* end namespace vt::vrt::collection::balance::replay */ + +#endif /*INCLUDED_VT_VRT_COLLECTION_BALANCE_WORKLOAD_REPLAY_H*/ diff --git a/tests/unit/collection/test_workload_data_migrator.cc b/tests/unit/collection/test_workload_data_migrator.cc new file mode 100644 index 0000000000..7d20356fff --- /dev/null +++ b/tests/unit/collection/test_workload_data_migrator.cc @@ -0,0 +1,799 @@ +/* +//@HEADER +// ***************************************************************************** +// +// test_workload_data_migrator.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include + +#include "test_parallel_harness.h" +#include "test_collection_common.h" + +#include "vt/elm/elm_id.h" +#include "vt/elm/elm_id_bits.h" +#include "vt/vrt/collection/balance/stats_msg.h" +#include "vt/vrt/collection/balance/lb_common.h" +#include "vt/vrt/collection/balance/lb_data_holder.h" +#include "vt/vrt/collection/balance/lb_invoke/lb_manager.h" +#include "vt/vrt/collection/balance/workload_replay.h" +#include "vt/vrt/collection/balance/model/proposed_reassignment.h" + +#if vt_check_enabled(lblite) + +namespace vt { namespace tests { namespace unit { namespace replay { + +using namespace vt::tests::unit; + +using vt::vrt::collection::balance::LBDataHolder; +using vt::vrt::collection::balance::LoadModel; +using vt::vrt::collection::balance::ProposedReassignment; +using vt::vrt::collection::balance::ReassignmentMsg; +using vt::vrt::collection::balance::replay::WorkloadDataMigrator; + +struct TestWorkloadDataMigrator : TestParallelHarness { }; + +std::shared_ptr +setupWorkloads(PhaseType phase, size_t numElements) { + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Generating workloads to replay...\n"); + } + + using vt::vrt::collection::balance::ElementIDStruct; + + std::vector myElemList(numElements); + + for (size_t ii = 0; ii < numElements; ++ii) { + myElemList[ii] = elm::ElmIDBits::createCollectionImpl( + true, ii+1, this_node, this_node + ); + } + + auto lbdh = std::make_shared(); + + for (auto&& elmID : myElemList) { + double tval = elmID.id * 2; + lbdh->node_data_[phase][elmID].whole_phase_load = tval; + auto &subphase_loads = lbdh->node_data_[phase][elmID].subphase_loads; + subphase_loads.push_back(elmID.id % 2 ? tval : 0); + subphase_loads.push_back(elmID.id % 2 ? 0 : tval); + } + + return lbdh; +} + +std::shared_ptr +setupBaseModel(PhaseType phase, std::shared_ptr lbdh) { + auto base_load_model = vt::theLBManager()->getBaseLoadModel(); + // force it to use our json workloads, not anything it may have collected + base_load_model->setLoads(&lbdh->node_data_, &lbdh->node_comm_); + + vt::runInEpochCollective("updateLoads", [&]{ + base_load_model->updateLoads(phase); + }); + + return base_load_model; +} + +std::shared_ptr +migrateObjects( + std::shared_ptr base_load_model, + vt::PhaseType phase, + vt::vrt::collection::balance::LBType balancer +) { + std::shared_ptr new_model = nullptr; + + vt::runInEpochCollective("migrate", [&]{ + auto postLBWork = [&](ReassignmentMsg *msg) { + auto lb_reassignment = msg->reassignment; + if (lb_reassignment) { + vt_debug_print( + normal, replay, + "global_mig={}, depart={}, arrive={}\n", + lb_reassignment->global_migration_count, + lb_reassignment->depart_.size(), + lb_reassignment->arrive_.size() + ); + new_model = std::make_shared( + base_load_model, + WorkloadDataMigrator::updateCurrentNodes(lb_reassignment) + ); + } + }; + auto cb = theCB()->makeFunc( + vt::pipe::LifetimeEnum::Once, postLBWork + ); + theLBManager()->startLB(phase, balancer, cb); + }); + + runInEpochCollective("destroy lb", [&]{ + vt::theLBManager()->destroyLB(); + }); + + return new_model; +} + +std::shared_ptr +shiftObjectsRight( + std::shared_ptr base_load_model, + vt::PhaseType phase +) { + using vt::vrt::collection::balance::LBType; + return migrateObjects(base_load_model, phase, LBType::RotateLB); +} + +std::shared_ptr +shiftObjectsRandomly( + std::shared_ptr base_load_model, + vt::PhaseType phase +) { + using vt::vrt::collection::balance::LBType; + return migrateObjects(base_load_model, phase, LBType::RandomLB); +} + + +TEST_F(TestWorkloadDataMigrator, test_normalize_call) { + auto const& this_node = vt::theContext()->getNode(); + auto const& num_nodes = vt::theContext()->getNumNodes(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + vt::objgroup::proxy::Proxy norm_lb_proxy; + std::shared_ptr new_model = nullptr; + + // choose a set of migrations for the load model to represent + vt::runInEpochCollective("do_lb", [&]{ + norm_lb_proxy = WorkloadDataMigrator::construct(base_load_model); + auto normalizer = norm_lb_proxy.get(); + + vt::runInEpochCollective("choose migrations", [&]{ + for (auto obj_id : *base_load_model) { + if (obj_id.isMigratable()) { + vt::NodeType dest = obj_id.id % num_nodes; + normalizer->migrateObjectTo(obj_id, dest); + } + } + }); + + auto reassignment = normalizer->normalizeReassignments(); + new_model = std::make_shared( + base_load_model, WorkloadDataMigrator::updateCurrentNodes(reassignment) + ); + }); + vt::runInEpochCollective("destroy lb", [&]{ + norm_lb_proxy.destroyCollective(); + }); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *new_model) { + if (obj_id.isMigratable()) { + vt::NodeType dest = obj_id.id % num_nodes; + EXPECT_EQ(dest, this_node); + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = new_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = new_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = new_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_data_home) { + auto const& this_node = vt::theContext()->getNode(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // move everything off the home node + std::shared_ptr not_home_model = shiftObjectsRight( + base_load_model, phase + ); + + // list nothing as here so that we skip the optimization + using ObjIDType = vt::elm::ElementIDStruct; + std::set no_migratable_objects_here; + + // then create a load model that restores them to homes + std::shared_ptr back_home_model = + WorkloadDataMigrator::relocateMisplacedWorkloadsHome( + not_home_model, no_migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *back_home_model) { + if (obj_id.isMigratable()) { + auto home = obj_id.getHomeNode(); + EXPECT_EQ(home, this_node); + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = back_home_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = back_home_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = back_home_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_some_data_home) { + auto const& this_node = vt::theContext()->getNode(); + auto const& num_nodes = vt::theContext()->getNumNodes(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // move everything off the home node + std::shared_ptr not_home_model = shiftObjectsRight( + base_load_model, phase + ); + using ObjIDType = vt::elm::ElementIDStruct; + std::set migratable_objects_here; + for (auto it = not_home_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + // only claim a subset of them are here (relates to an optimization in + // the code being tested) + if ((*it).id % 3 == 0) { + migratable_objects_here.insert(*it); + } + } + } + + // then create a load model that restores them to homes + std::shared_ptr back_home_if_not_here_model = + WorkloadDataMigrator::relocateMisplacedWorkloadsHome( + not_home_model, migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *back_home_if_not_here_model) { + if (obj_id.isMigratable()) { + auto home = obj_id.getHomeNode(); + if (obj_id.id % 3 == 0) { + // the optimization should have prevented these from moving home + EXPECT_EQ(home, (this_node + num_nodes - 1) % num_nodes); + } else { + // but these must be home now + EXPECT_EQ(home, this_node); + } + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = back_home_if_not_here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = back_home_if_not_here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = back_home_if_not_here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_home) { + auto const& this_node = vt::theContext()->getNode(); + auto const& num_nodes = vt::theContext()->getNumNodes(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // move everything off the home node + std::shared_ptr not_home_model = shiftObjectsRight( + base_load_model, phase + ); + using ObjIDType = vt::elm::ElementIDStruct; + std::set migratable_objects_here; + for (auto it = not_home_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + migratable_objects_here.insert(*it); + } + } + + // then create a load model that restores them to homes + std::shared_ptr here_model = + WorkloadDataMigrator::relocateMisplacedWorkloadsHere( + base_load_model, migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *here_model) { + if (obj_id.isMigratable()) { + auto home = obj_id.getHomeNode(); + EXPECT_EQ(home, (this_node + num_nodes - 1) % num_nodes); + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_some_data_here_from_home) { + auto const& this_node = vt::theContext()->getNode(); + auto const& num_nodes = vt::theContext()->getNumNodes(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // move everything off the home node + std::shared_ptr not_home_model = shiftObjectsRight( + base_load_model, phase + ); + using ObjIDType = vt::elm::ElementIDStruct; + std::set migratable_objects_here; + for (auto it = not_home_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + // only claim a subset of them are here (relates to an optimization in + // the code being tested) + if ((*it).id % 3 == 0) { + migratable_objects_here.insert(*it); + } + } + } + + // then create a load model that brings them here + std::shared_ptr here_model = + WorkloadDataMigrator::relocateMisplacedWorkloadsHere( + base_load_model, migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *here_model) { + if (obj_id.isMigratable()) { + auto home = obj_id.getHomeNode(); + if (obj_id.id % 3 == 0) { + // these must have moved here from home + EXPECT_EQ(home, (this_node + num_nodes - 1) % num_nodes); + } else { + // but the optimization should have prevented these from moving away + // from home + EXPECT_EQ(home, this_node); + } + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_1) { + auto const& this_node = vt::theContext()->getNode(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // shift the workloads to not be home + std::shared_ptr workloads_not_home_model = + shiftObjectsRight(base_load_model, phase); + + // put the objects whereever + std::shared_ptr objects_whereever_model = + shiftObjectsRandomly(base_load_model, phase); + using ObjIDType = vt::elm::ElementIDStruct; + std::set migratable_objects_here; + for (auto it = objects_whereever_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + migratable_objects_here.insert(*it); + } + } + + // then create a load model that matches everything up + std::shared_ptr here_model = + WorkloadDataMigrator::relocateWorkloadsForReplay( + workloads_not_home_model, migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *here_model) { + if (obj_id.isMigratable()) { + EXPECT_EQ(migratable_objects_here.count(obj_id), 1); + + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + +TEST_F(TestWorkloadDataMigrator, test_move_data_here_from_whereever_2) { + auto const& this_node = vt::theContext()->getNode(); + + PhaseType phase = 0; + const size_t numElements = 5; + + auto lbdh = setupWorkloads(phase, numElements); + auto base_load_model = setupBaseModel(phase, lbdh); + + // put the workloads whereever + std::shared_ptr workloads_whereever_model = + shiftObjectsRandomly(base_load_model, phase); + + // shift the objects so they aren't at home + std::shared_ptr objects_not_home_model = + shiftObjectsRight(base_load_model, phase); + using ObjIDType = vt::elm::ElementIDStruct; + std::set migratable_objects_here; + for (auto it = objects_not_home_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + migratable_objects_here.insert(*it); + } + } + + // then create a load model that matches everything up + std::shared_ptr here_model = + WorkloadDataMigrator::relocateWorkloadsForReplay( + workloads_whereever_model, migratable_objects_here + ); + + // then iterate over it to make sure what shows up here is correct + for (auto obj_id : *here_model) { + if (obj_id.isMigratable()) { + EXPECT_EQ(migratable_objects_here.count(obj_id), 1); + + EXPECT_EQ(obj_id.getCurrNode(), this_node); + + using vt::vrt::collection::balance::PhaseOffset; + auto load = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + EXPECT_EQ(load, obj_id.id * 2); + auto subload0 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 0} + ); + EXPECT_EQ(subload0, obj_id.id % 2 ? obj_id.id * 2 : 0); + auto subload1 = here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, 1} + ); + EXPECT_EQ(subload1, obj_id.id % 2 ? 0 : obj_id.id * 2); + } + } +} + + +struct StatsResults { + StatsResults(PhaseType initial_phase, PhaseType lb_interval) + : save_phase_(initial_phase), + comp_phase_(initial_phase), + lb_interval_(lb_interval) { } + + PhaseType save_phase_ = 0; + PhaseType comp_phase_ = 0; + PhaseType lb_interval_ = 1; + + std::unordered_map O_min_; + std::unordered_map O_max_; + std::unordered_map O_car_; + std::unordered_map P_sum_; + + using Statistic = vt::vrt::collection::lb::Statistic; + using LoadData = vt::vrt::collection::balance::LoadData; + + void saveStatsHandler(std::vector const& in_stat_vec) { + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Saving subset of statistics for phase {}\n", comp_phase_); + } + + for (auto&& st : in_stat_vec) { + auto stat = st.stat_; + if (stat == Statistic::Rank_load_modeled) { + P_sum_[save_phase_] = st.sum(); + } else if (stat == Statistic::Object_load_modeled) { + O_min_[save_phase_] = st.min(); + O_max_[save_phase_] = st.max(); + O_car_[save_phase_] = st.N_; + } + } + + ++save_phase_; + } + + void compStatsHandler(std::vector const& in_stat_vec) { + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Comparing subset of post-LB statistics for phase {}\n", comp_phase_); + } + + for (auto&& st : in_stat_vec) { + auto stat = st.stat_; + if (stat == Statistic::Rank_load_modeled) { + EXPECT_EQ(P_sum_[comp_phase_], st.sum()); + } else if (stat == Statistic::Object_load_modeled) { + EXPECT_EQ(O_min_[comp_phase_], st.min()); + EXPECT_EQ(O_max_[comp_phase_], st.max()); + EXPECT_EQ(O_car_[comp_phase_], st.N_); + } + } + + comp_phase_ += lb_interval_; + } +}; + +std::shared_ptr +migrateObjectsAndDoStatistics( + std::shared_ptr base_load_model, + vt::PhaseType phase, + vt::vrt::collection::balance::LBType balancer, + vt::objgroup::proxy::Proxy o_proxy +) { + std::shared_ptr new_model = nullptr; + + vt::runInEpochCollective("migrate", [&]{ + auto postLBWork = [&](ReassignmentMsg *msg) { + auto lb_reassignment = msg->reassignment; + if (lb_reassignment) { + vt_debug_print( + normal, replay, + "global_mig={}, depart={}, arrive={}\n", + lb_reassignment->global_migration_count, + lb_reassignment->depart_.size(), + lb_reassignment->arrive_.size() + ); + new_model = std::make_shared( + base_load_model, + WorkloadDataMigrator::updateCurrentNodes(lb_reassignment) + ); + runInEpochCollective("computeAndStoreStats", [=] { + auto stats_cb = vt::theCB()->makeBcast< + &StatsResults::saveStatsHandler + >(o_proxy); + theLBManager()->computeStatistics(new_model, false, phase, stats_cb); + }); + } + }; + auto cb = theCB()->makeFunc( + vt::pipe::LifetimeEnum::Once, postLBWork + ); + theLBManager()->startLB(phase, balancer, cb); + }); + + runInEpochCollective("destroy lb", [&]{ + vt::theLBManager()->destroyLB(); + }); + + return new_model; +} + +std::shared_ptr +shiftObjectsRightAndDoStatistics( + std::shared_ptr base_load_model, + vt::PhaseType phase, vt::objgroup::proxy::Proxy o_proxy +) { + using vt::vrt::collection::balance::LBType; + return migrateObjectsAndDoStatistics( + base_load_model, phase, LBType::RotateLB, o_proxy + ); +} + +std::shared_ptr +setupManyWorkloads( + PhaseType initial_phase, PhaseType num_phases, size_t numElements, + vt::objgroup::proxy::Proxy o_proxy +) { + auto const& this_node = vt::theContext()->getNode(); + + if (this_node == 0) { + vt_print(replay, "Generating workloads to replay...\n"); + } + + using vt::vrt::collection::balance::ElementIDStruct; + + std::vector myElemList(numElements); + + for (size_t ii = 0; ii < numElements; ++ii) { + myElemList[ii] = elm::ElmIDBits::createCollectionImpl( + true, ii+1, this_node, this_node + ); + } + + auto lbdh = std::make_shared(); + + PhaseType stop_phase = initial_phase + num_phases; + for (PhaseType phase = initial_phase; phase < stop_phase; ++phase) { + for (size_t ii = 0; ii < numElements; ++ii) { + auto elmID = myElemList[ii]; + double tval = this_node + (ii + 10) * 2; + lbdh->node_data_[phase][elmID].whole_phase_load = tval + phase; + auto &subphase_loads = lbdh->node_data_[phase][elmID].subphase_loads; + subphase_loads.push_back(elmID.id % 2 ? tval : phase); + subphase_loads.push_back(elmID.id % 2 ? phase : tval); + } + } + + auto scrambled_lbdh = std::make_shared(); + + for (PhaseType phase = initial_phase; phase < stop_phase; ++phase) { + auto base_load_model = setupBaseModel(phase, lbdh); + + std::shared_ptr not_home_model = + shiftObjectsRightAndDoStatistics(base_load_model, phase, o_proxy); + + std::set migratable_objects_here; + for (auto it = not_home_model->begin(); it.isValid(); ++it) { + if ((*it).isMigratable()) { + migratable_objects_here.insert(*it); + } + } + + // then create a load model that matches everything up + std::shared_ptr here_model = + WorkloadDataMigrator::relocateWorkloadsForReplay( + not_home_model, migratable_objects_here + ); + + // then store them at their new locations + for (auto it = here_model->begin(); it.isValid(); ++it) { + auto obj_id = *it; + using vt::vrt::collection::balance::PhaseOffset; + scrambled_lbdh->node_data_[phase][obj_id].whole_phase_load = + here_model->getModeledLoad( + obj_id, {PhaseOffset::NEXT_PHASE, PhaseOffset::WHOLE_PHASE} + ); + scrambled_lbdh->node_data_[phase][*it].subphase_loads.push_back( + here_model->getModeledLoad(obj_id, {PhaseOffset::NEXT_PHASE, 0}) + ); + scrambled_lbdh->node_data_[phase][*it].subphase_loads.push_back( + here_model->getModeledLoad(obj_id, {PhaseOffset::NEXT_PHASE, 1}) + ); + } + } + + return scrambled_lbdh; +} + +struct TestWorkloadReplay : TestParallelHarness { +#if vt_check_enabled(lblite) + void addAdditionalArgs() override { + static char vt_lb[]{"--vt_lb"}; + static char vt_lb_name[]{"--vt_lb_name=RandomLB"}; + static char vt_lb_interval[]{"--vt_lb_interval=2"}; + addArgs(vt_lb, vt_lb_name, vt_lb_interval); + } +#endif +}; + +TEST_F(TestWorkloadReplay, test_run_replay_verify_some_stats) { + PhaseType initial_phase = 1; + PhaseType num_phases = 5; + const size_t numElements = 5; + const PhaseType lb_interval = 2; // make sure this matches the harness above + + auto o_proxy = vt::theObjGroup()->makeCollective( + "StatsResults", initial_phase, lb_interval + ); + + // first set up the workloads to replay, moving them around by phase + auto lbdh = setupManyWorkloads( + initial_phase, num_phases, numElements, o_proxy + ); + + // make our own stats callback so that we can check the results + auto stats_cb = vt::theCB()->makeBcast< + &StatsResults::compStatsHandler + >(o_proxy); + + // then replay them but allow the lb to place objects differently + vt::vrt::collection::balance::replay::replayWorkloads( + initial_phase, num_phases, lbdh, stats_cb + ); +} + +}}}} // end namespace vt::tests::unit::replay + +#endif /*vt_check_enabled(lblite)*/ diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt new file mode 100644 index 0000000000..079d49dad9 --- /dev/null +++ b/tools/CMakeLists.txt @@ -0,0 +1,26 @@ + +# +# Tools +# + +include(turn_on_warnings) + +macro(add_tool tool_name) + set(TOOL_FILE "${tool_name}.cc") + + add_executable(${tool_name} ${TOOL_FILE}) + add_dependencies(tools ${tool_name}) + + turn_on_warnings(${tool_name}) + + if (vt_unity_build_enabled) + set_target_properties(${tool_name} PROPERTIES UNITY_BUILD ON) + endif() + + link_target_with_vt( + TARGET ${tool_name} + DEFAULT_LINK_SET + ) +endmacro() + +add_subdirectory(workload_replay) diff --git a/tools/workload_replay/CMakeLists.txt b/tools/workload_replay/CMakeLists.txt new file mode 100644 index 0000000000..c9480ff0a9 --- /dev/null +++ b/tools/workload_replay/CMakeLists.txt @@ -0,0 +1,11 @@ + +set( + WORKLOAD_REPLAY_TOOLS + simulate_replay +) + +foreach(TOOL_NAME ${WORKLOAD_REPLAY_TOOLS}) + # message("Example: building workload replay tool >>>>> ${TOOL_NAME}") + + add_tool(${TOOL_NAME}) +endforeach() diff --git a/tools/workload_replay/simulate_replay.cc b/tools/workload_replay/simulate_replay.cc new file mode 100644 index 0000000000..8a7045c655 --- /dev/null +++ b/tools/workload_replay/simulate_replay.cc @@ -0,0 +1,73 @@ +/* +//@HEADER +// ***************************************************************************** +// +// simulate_replay.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include +#include + +int main(int argc, char** argv) { + using vt::PhaseType; + + vt::initialize(argc, argv); + + vtAbortIf( + argc != 3, + "Must have two app-specific arguments: \n" + "The json workload files needs to be specified using\n" + "--vt_lb_data_file_in and --vt_lb_data_dir_in" + ); + + // initial phase to simulate + PhaseType initial_phase = atoi(argv[1]); + // number of phases to simulate + PhaseType phases_to_run = atoi(argv[2]); + + // the workloads used will be those specified with the command-line arguments + // --vt_lb_data_file_in and --vt_lb_data_dir_in + vt::vrt::collection::balance::replay::replayWorkloads( + initial_phase, phases_to_run + ); + + vt::finalize(); + + return 0; +}