Skip to content

Commit

Permalink
#431 Reduce the number of static variables. Move tools for reading in…
Browse files Browse the repository at this point in the history
…put files in ProcStats. Clean up variable names.
  • Loading branch information
uhetmaniuk authored and lifflander committed Feb 5, 2020
1 parent 945ed62 commit 6bcb693
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 340 deletions.
5 changes: 1 addition & 4 deletions src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,12 @@
#include "vt/rdma/rdma_headers.h"
#include "vt/parameterization/parameterization.h"
#include "vt/sequence/sequencer_headers.h"
#include "vt/trace/trace.h"
#include "vt/pipe/pipe_manager.h"
#include "vt/objgroup/manager.h"
#include "vt/scheduler/scheduler.h"
#include "vt/termination/termination.h"
#include "vt/topos/location/location_headers.h"
#include "vt/vrt/context/context_vrtmanager.h"
#include "vt/vrt/collection/balance/lb_type.h"
#include "vt/vrt/collection/balance/stats_lb_reader.h"
#include "vt/vrt/collection/collection_headers.h"
#include "vt/worker/worker_headers.h"
#include "vt/configs/generated/vt_git_revision.h"
Expand Down Expand Up @@ -826,7 +823,7 @@ bool Runtime::initialize(bool const force_now) {
auto lbNames = vrt::collection::balance::lb_names_;
auto mapLB = vrt::collection::balance::LBType::StatsMapLB;
if (ArgType::vt_lb_name == lbNames[mapLB]) {
vrt::collection::balance::StatsLBReader::init();
vrt::collection::balance::ProcStats::readRestartInfo();
}
}
#endif
Expand Down
5 changes: 4 additions & 1 deletion src/vt/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@
#include "vt/config.h"
#include "vt/runtime/runtime_common.h"
#include "vt/runtime/runtime_component_fwd.h"
#include "vt/trace/trace.h"
#include "vt/worker/worker_headers.h"
#include "vt/configs/arguments/args.h"

#if backend_check_enabled(trace_enabled)
#include "vt/trace/trace.h"
#endif

#include <memory>
#include <functional>
#include <string>
Expand Down
1 change: 0 additions & 1 deletion src/vt/vrt/collection/balance/lb_invoke/invoke.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
#include "vt/vrt/collection/balance/rotatelb/rotatelb.h"
#include "vt/vrt/collection/balance/gossiplb/gossiplb.h"
#include "vt/vrt/collection/balance/statsmaplb/statsmaplb.h"
#include "vt/vrt/collection/balance/stats_lb_reader.h"
#include "vt/vrt/collection/messages/system_create.h"
#include "vt/vrt/collection/manager.fwd.h"

Expand Down
241 changes: 239 additions & 2 deletions src/vt/vrt/collection/balance/proc_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@

#include "vt/config.h"
#include "vt/vrt/collection/balance/proc_stats.h"
#include "vt/vrt/collection/balance/proc_stats.util.h"
#include "vt/vrt/collection/manager.h"
#include "vt/timing/timing.h"
#include "vt/configs/arguments/args.h"
#include "vt/runtime/runtime.h"

#include <vector>
#include <unordered_map>
#include <string>
#include <cstdio>
#include <unistd.h>
#include <sys/stat.h>

#include "fmt/format.h"
Expand Down Expand Up @@ -87,13 +86,251 @@ std::unordered_map<ElementIDType,ProcStats::MigrateFnType>

/*static*/ std::vector< bool > ProcStats::proc_phase_runs_LB_ = {};

/*static*/ StatsRestartReader *ProcStats::proc_reader_ = nullptr;

StatsRestartReader::~StatsRestartReader() {
if (proxy.getProxy() != no_obj_group) {
theObjGroup()->destroyCollective(proxy);
}
}

/*static*/
void StatsRestartReader::readStats() {

// Read the input files
std::deque< std::set<ElementIDType> > elements_history;
inputStatsFile(elements_history);
if (elements_history.empty()) {
vtWarn("No element history provided");
return;
}

ProcStats::proc_reader_->proxy =
theObjGroup()->makeCollective<StatsRestartReader>();

const auto num_iters = elements_history.size() - 1;
ProcStats::proc_move_list_.resize(num_iters + 1);
ProcStats::proc_phase_runs_LB_.resize(num_iters, true);
if (theContext()->getNode() == 0) {
ProcStats::proc_reader_->msgsReceived.resize(num_iters, 0);
ProcStats::proc_reader_->totalMove.resize(num_iters);
}

// Communicate the migration information
createMigrationInfo(elements_history);
}

/*static*/
void StatsRestartReader::inputStatsFile(
std::deque< std::set<ElementIDType> > &element_history
)
{
using ArgType = vt::arguments::ArgConfig;
auto const node = theContext()->getNode();
const std::string &base_file = ArgType::vt_lb_stats_file_in;
const std::string &dir = ArgType::vt_lb_stats_dir_in;
auto const file = fmt::format("{}.{}.out", base_file, node);
auto const file_name = fmt::format("{}/{}", dir, file);

vt_print(lb, "inputStatFile: file={}, iter={}\n", file_name, 0);

std::FILE *pFile = std::fopen(file_name.c_str(), "r");
if (pFile == nullptr) {
vtAssert(pFile, "File opening failed");
}

std::set<ElementIDType> buffer;

// Load: Format of a line :size_t, ElementIDType, TimeType
size_t phaseID = 0, prevPhaseID = 0;
ElementIDType elmID;
TimeType tval;
CommBytesType d_buffer;
using vtCommType = typename std::underlying_type<CommCategory>::type;
vtCommType typeID;
char separator;
fpos_t pos;
bool finished = false;
while (!finished) {
if (fscanf(pFile, "%zu %c %lli %c %lf", &phaseID, &separator, &elmID,
&separator, &tval) > 0) {
fgetpos (pFile,&pos);
fscanf (pFile, "%c", &separator);
if (separator == ',') {
// COM detected, read the end of line and do nothing else
int res = fscanf (pFile, "%lf %c %hhi", &d_buffer, &separator, &typeID);
vtAssertExpr(res == 3);
} else {
// Load detected, create the new element
fsetpos (pFile,&pos);
if (prevPhaseID != phaseID) {
prevPhaseID = phaseID;
element_history.push_back(buffer);
buffer.clear();
}
buffer.insert(elmID);
}
} else {
finished = true;
}
}

if (!buffer.empty()) {
element_history.push_back(buffer);
}

std::fclose(pFile);
}

/*static*/
void StatsRestartReader::createMigrationInfo(
std::deque< std::set<ElementIDType> > &element_history
)
{
const auto num_iters = element_history.size() - 1;
const auto myNodeID = static_cast<ElementIDType>(theContext()->getNode());
auto myProxy = ProcStats::proc_reader_->proxy;

for (size_t ii = 0; ii < num_iters; ++ii) {
auto &elms = element_history[ii];
auto &elmsNext = element_history[ii + 1];
std::set<ElementIDType> diff;
std::set_difference(elmsNext.begin(), elmsNext.end(), elms.begin(),
elms.end(), std::inserter(diff, diff.begin()));
const size_t qi = diff.size();
const size_t pi = elms.size() - (elmsNext.size() - qi);
auto &myList = ProcStats::proc_move_list_[ii];
myList.reserve(3 * (pi + qi) + 1);
//--- Store the iteration number
myList.push_back(static_cast<ElementIDType>(ii));
//--- Store partial migration information (i.e. nodes moving in)
for (auto iEle : diff) {
myList.push_back(iEle); //--- permID to receive
myList.push_back(no_element_id); // node moving from
myList.push_back(myNodeID); // node moving to
}
diff.clear();
//--- Store partial migration information (i.e. nodes moving out)
std::set_difference(elms.begin(), elms.end(), elmsNext.begin(),
elmsNext.end(), std::inserter(diff, diff.begin()));
for (auto iEle : diff) {
myList.push_back(iEle); //--- permID to send
myList.push_back(myNodeID); // node migrating from
myList.push_back(no_element_id); // node migrating to
}
//
// Create a message storing the vector
//
auto msg = makeSharedMessage<VecMsg>(myList);
myProxy[0].send<VecMsg, &StatsRestartReader::gatherMsgs>(msg);
//
// Clear old distribution of elements
//
elms.clear();
}

}

void StatsRestartReader::gatherMsgs(VecMsg *msg) {
auto sentVec = msg->getTransfer();
vtAssert(sentVec.size() % 3 == 1, "Expecting vector of length 3n+1");
const ElementIDType phaseID = sentVec[0];
//
// --- Combine the different pieces of information
//
ProcStats::proc_reader_->msgsReceived[phaseID] += 1;
auto &migrate = ProcStats::proc_reader_->totalMove[phaseID];
for (size_t ii = 1; ii < sentVec.size(); ii += 3) {
const auto permID = sentVec[ii];
const auto nodeFrom = static_cast<NodeType>(sentVec[ii + 1]);
const auto nodeTo = static_cast<NodeType>(sentVec[ii + 2]);
auto iptr = migrate.find(permID);
if (iptr == migrate.end()) {
migrate.insert(std::make_pair(permID, std::make_pair(nodeFrom, nodeTo)));
}
else {
auto &nodePair = iptr->second;
nodePair.first = std::max(nodePair.first, nodeFrom);
nodePair.second = std::max(nodePair.second, nodeTo);
}
}
//
// --- Check whether all the messages have been received
//
const NodeType numNodes = theContext()->getNumNodes();
if (ProcStats::proc_reader_->msgsReceived[phaseID] < numNodes)
return;
//
//--- Distribute the information when everything has been received
//
auto myProxy = ProcStats::proc_reader_->proxy;
const size_t header = 2;
for (NodeType in = 0; in < numNodes; ++in) {
size_t iCount = 0;
for (auto iNode : migrate) {
if (iNode.second.first == in)
iCount += 1;
}
std::vector<ElementIDType> toMove(2 * iCount + header);
iCount = 0;
toMove[iCount++] = phaseID;
toMove[iCount++] = static_cast<ElementIDType>(migrate.size());
for (auto iNode : migrate) {
if (iNode.second.first == in) {
toMove[iCount++] = iNode.first;
toMove[iCount++] = static_cast<ElementIDType>(iNode.second.second);
}
}
if (in > 0) {
auto msg2 = makeSharedMessage<VecMsg>(toMove);
myProxy[in].send<VecMsg, &StatsRestartReader::scatterMsgs>(msg2);
} else {
ProcStats::proc_phase_runs_LB_[phaseID] = (!migrate.empty());
auto &myList = ProcStats::proc_move_list_[phaseID];
myList.resize(toMove.size() - header);
std::copy(&toMove[header], &toMove[0] + toMove.size(),
myList.begin());
}
}
migrate.clear();
}

void StatsRestartReader::scatterMsgs(VecMsg *msg) {
const size_t header = 2;
auto recvVec = msg->getTransfer();
vtAssert((recvVec.size() -header) % 2 == 0,
"Expecting vector of length 2n+2");
//--- Get the iteration number associated with the message
const ElementIDType phaseID = recvVec[0];
//--- Check whether some migration will be done
ProcStats::proc_phase_runs_LB_[phaseID] = static_cast<bool>(recvVec[1] > 0);
auto &myList = ProcStats::proc_move_list_[phaseID];
if (!ProcStats::proc_phase_runs_LB_[phaseID]) {
myList.clear();
return;
}
//--- Copy the migration information
myList.resize(recvVec.size() - header);
std::copy(&recvVec[header], &recvVec[0]+recvVec.size(), myList.begin());
}

/*static*/ void ProcStats::readRestartInfo() {
if (ProcStats::proc_reader_ == nullptr) {
ProcStats::proc_reader_ = new StatsRestartReader;
}
ProcStats::proc_reader_->readStats();
}

/*static*/ void ProcStats::clearStats() {
ProcStats::proc_comm_.clear();
ProcStats::proc_data_.clear();
ProcStats::proc_migrate_.clear();
ProcStats::proc_temp_to_perm_.clear();
ProcStats::proc_perm_to_temp_.clear();
next_elm_ = 1;
ProcStats::proc_move_list_.clear();
ProcStats::proc_phase_runs_LB_.clear();
delete ProcStats::proc_reader_;
}

/*static*/ void ProcStats::startIterCleanup() {
Expand Down
13 changes: 10 additions & 3 deletions src/vt/vrt/collection/balance/proc_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct StatsMapLB;
namespace vt { namespace vrt { namespace collection { namespace balance {

struct LBManager;
struct StatsLBReader;
struct StatsRestartReader;

struct ProcStats {
using MigrateFnType = std::function<void(NodeType)>;
Expand All @@ -82,6 +82,8 @@ struct ProcStats {

static void outputStatsFile();

static void readRestartInfo();

private:
static void createStatsFile();
static void closeStatsFile();
Expand All @@ -102,6 +104,7 @@ struct ProcStats {
static FILE* stats_file_;
static bool created_dir_;

#if backend_check_enabled(lblite)
/// \brief Queue of migrations for each iteration.
/// \note At each iteration, a vector of length 2 times (# of migrations)
/// is specified. The vector contains the "permanent" ID of the element
Expand All @@ -112,9 +115,13 @@ struct ProcStats {
/// map migrates elements for a specific iteration.
static std::vector< bool > proc_phase_runs_LB_;

friend struct lb::StatsMapLB;
friend struct balance::StatsLBReader;
/// \brief Private object to migrate information from a (restart) input file
static StatsRestartReader *proc_reader_;
#endif

friend struct balance::LBManager;
friend struct balance::StatsRestartReader;
friend struct lb::StatsMapLB;

};

Expand Down
Loading

0 comments on commit 6bcb693

Please sign in to comment.