Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2098: runtime: add code to generate node IDs #2109

Merged
merged 8 commits into from
Mar 28, 2023
Merged
34 changes: 28 additions & 6 deletions scripts/JSON_data_files_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,22 @@ def get_error_message(iterable_collection: Iterable) -> str:
def _get_valid_schema(self) -> Schema:
""" Returns representation of a valid schema
"""
allowed_types = ("LBDatafile", "LBStatsfile")
allowed_types_data = ("LBDatafile")
valid_schema_data = Schema(
{
'type': And(str, lambda a: a in allowed_types,
error=f"{self.get_error_message(allowed_types)} must be chosen"),
Optional('type'): And(str, lambda a: a in allowed_types_data,
error=f"{self.get_error_message(allowed_types_data)} must be chosen"),
Optional('metadata'): {
Optional('type'): And(str, lambda a: a in allowed_types_data,
error=f"{self.get_error_message(allowed_types_data)} must be chosen"),
Optional('rank'): int,
Optional('shared_node'): {
'id': int,
'size': int,
'rank': int,
'num_nodes': int,
},
},
'phases': [
{
'id': int,
Expand Down Expand Up @@ -98,10 +109,15 @@ def _get_valid_schema(self) -> Schema:
]
}
)
allowed_types_stats = ("LBStatsfile")
valid_schema_stats = Schema(
{
'type': And(str, lambda a: a in allowed_types,
error=f"{self.get_error_message(allowed_types)} must be chosen"),
Optional('type'): And(str, lambda a: a in allowed_types_stats,
error=f"{self.get_error_message(allowed_types_stats)} must be chosen"),
Optional('metadata'): {
Optional('type'): And(str, lambda a: a in allowed_types_stats,
error=f"{self.get_error_message(allowed_types_stats)} must be chosen"),
},
'phases': [
{
"id": int,
Expand Down Expand Up @@ -417,7 +433,13 @@ def __validate_file(file_path):
decompressed_dict = json.loads(compr_bytes.decode("utf-8"))

# Extracting type from JSON data
schema_type = decompressed_dict.get("type")
schema_type = None
if decompressed_dict.get("metadata") is not None:
schema_type = decompressed_dict.get("metadata").get("type")
else:
if decompressed_dict.get("type") is not None:
schema_type = decompressed_dict.get("type")

if schema_type is not None:
# Validate schema
if SchemaValidator(schema_type=schema_type).is_valid(schema_to_validate=decompressed_dict):
Expand Down
40 changes: 40 additions & 0 deletions src/vt/runtime/runtime.cc
nlslatt marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,46 @@ Runtime::Runtime(
setupSignalHandler();
setupSignalHandlerINT();
setupTerminateHandler();

if (arg_config_->config_.vt_lb_data) {
determinePhysicalNodeIDs();
}
}

void Runtime::determinePhysicalNodeIDs() {
MPI_Comm i_comm = initial_communicator_;

MPI_Comm shm_comm;
MPI_Comm_split_type(i_comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shm_comm);
int shm_rank = -1;
int node_size = -1;
MPI_Comm_rank(shm_comm, &shm_rank);
MPI_Comm_size(shm_comm, &node_size);

int num_nodes = -1;
int is_rank_0 = (shm_rank == 0) ? 1 : 0;
MPI_Allreduce(&is_rank_0, &num_nodes, 1, MPI_INT, MPI_SUM, i_comm);

int starting_rank = -1;
MPI_Comm_rank(i_comm, &starting_rank);

MPI_Comm node_number_comm;
MPI_Comm_split(i_comm, shm_rank, starting_rank, &node_number_comm);
PhilMiller marked this conversation as resolved.
Show resolved Hide resolved

int node_id = -1;
if (shm_rank == 0) {
MPI_Comm_rank(node_number_comm, &node_id);
}
MPI_Bcast(&node_id, 1, MPI_INT, 0, shm_comm);

MPI_Comm_free(&shm_comm);
MPI_Comm_free(&node_number_comm);

has_physical_node_info = true;
physical_node_id = node_id;
physical_num_nodes = num_nodes;
physical_node_size = node_size;
physical_node_rank = shm_rank;
}

bool Runtime::hasSchedRun() const {
Expand Down
11 changes: 11 additions & 0 deletions src/vt/runtime/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ struct Runtime {
*/
static void writeToFile(std::string const& str);

/**
* \internal \brief Determine the physical node IDs for LB data files
*/
void determinePhysicalNodeIDs();

protected:
/**
* \internal \brief Try to initialize
Expand Down Expand Up @@ -430,6 +435,12 @@ struct Runtime {

static bool volatile sig_user_1_;

bool has_physical_node_info = false;
int physical_node_id = -1;
int physical_num_nodes = -1;
int physical_node_size = -1;
int physical_node_rank = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these all be suffixed with underscores?


protected:
bool finalize_on_term_ = false;
bool initialized_ = false, finalized_ = false, aborted_ = false;
Expand Down
18 changes: 9 additions & 9 deletions src/vt/utils/json/json_appender.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,36 @@ struct Appender : BaseAppender {
* \brief Construct a JSON appender for a specific array with a filename
*
* \param[in] array the JSON array name
* \param[in] schema_name the JSON schema name
* \param[in] metadata the JSON metadata
* \param[in] filename the JSON filename
* \param[in] compress whether to compress the output
*/
Appender(
std::string const& array, std::string const& schema_name,
std::string const& array, jsonlib const& metadata,
std::string const& filename, bool compress
)
: Appender(array, schema_name, StreamLike{filename}, compress)
: Appender(array, metadata, StreamLike{filename}, compress)
{ }

/**
* \brief Construct a JSON appender for a specific array with a stream
*
* \param[in] array the JSON array name
* \param[in] schema_name the JSON schema name
* \param[in] metadata the JSON metadata
* \param[in] in_os the output stream
* \param[in] compress whether to compress the output
*/
Appender(
std::string const& array, std::string const& schema_name, StreamLike in_os,
std::string const& array, jsonlib const& metadata, StreamLike in_os,
bool compress
)
: os_(std::move(in_os)),
oa_(std::make_shared<AdaptorType>(os_, compress))
{
oa_->write_character('{');
auto schema_str = fmt::format("\"type\":\"{}\",", schema_name);
oa_->write_characters(schema_str.c_str(), schema_str.length());
auto str = fmt::format("\"{}\":[", array);
oa_->write_characters("{\"metadata\":", 12);
SerializerType s(oa_, ' ', jsonlib::error_handler_t::strict);
s.dump(metadata, false, true, 0);
auto str = fmt::format(",\"{}\":[", array);
oa_->write_characters(str.c_str(), str.length());
}

Expand Down
4 changes: 3 additions & 1 deletion src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,10 @@ void LBManager::createStatisticsFile() {
using JSONAppender = util::json::Appender<std::ofstream>;

if (not statistics_writer_) {
nlohmann::json metadata;
metadata["type"] = "LBStatsfile";
statistics_writer_ = std::make_unique<JSONAppender>(
"phases", "LBStatsfile", file_name, compress
"phases", metadata, file_name, compress
);
}
}
Expand Down
13 changes: 12 additions & 1 deletion src/vt/vrt/collection/balance/node_lb_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,19 @@ void NodeLBData::createLBDataFile() {
using JSONAppender = util::json::Appender<std::ofstream>;

if (not lb_data_writer_) {
nlohmann::json metadata;
if (curRT->has_physical_node_info) {
nlohmann::json node_metadata;
node_metadata["id"] = curRT->physical_node_id;
node_metadata["size"] = curRT->physical_node_size;
node_metadata["rank"] = curRT->physical_node_rank;
node_metadata["num_nodes"] = curRT->physical_num_nodes;
metadata["shared_node"] = node_metadata;
}
metadata["type"] = "LBDatafile";
metadata["rank"] = theContext()->getNode();
lb_data_writer_ = std::make_unique<JSONAppender>(
"phases", "LBDatafile", file_name, compress
"phases", metadata, file_name, compress
);
}
}
Expand Down
8 changes: 6 additions & 2 deletions tests/unit/collection/test_lb.extended.cc
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,10 @@ getLBDataForPhase(
using vt::vrt::collection::balance::LBDataHolder;
using json = nlohmann::json;
std::stringstream ss{std::ios_base::out | std::ios_base::in};
nlohmann::json metadata;
metadata["type"] = "LBDatafile";
auto ap = std::make_unique<JSONAppender>(
"phases", "LBDatafile", std::move(ss), false
"phases", metadata, std::move(ss), false
);
auto j = in.toJson(phase);
ap->addElm(*j);
Expand Down Expand Up @@ -689,8 +691,10 @@ getJsonStringForPhase(
using vt::vrt::collection::balance::LBDataHolder;
using JSONAppender = vt::util::json::Appender<std::stringstream>;
std::stringstream ss{std::ios_base::out | std::ios_base::in};
nlohmann::json metadata;
metadata["type"] = "LBDatafile";
auto ap = std::make_unique<JSONAppender>(
"phases", "LBDatafile", std::move(ss), false
"phases", metadata, std::move(ss), false
);
auto j = in.toJson(phase);
ap->addElm(*j);
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/lb/test_lb_data_comm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ LBDataHolder getLBDataForPhase(vt::PhaseType phase) {
using vt::vrt::collection::balance::LBDataHolder;
using json = nlohmann::json;
std::stringstream ss{std::ios_base::out | std::ios_base::in};
nlohmann::json metadata;
metadata["type"] = "LBDatafile";
auto ap = std::make_unique<JSONAppender>(
"phases", "LBDatafile", std::move(ss), true
"phases", metadata, std::move(ss), true
);
auto j = vt::theNodeLBData()->getLBData()->toJson(phase);
ap->addElm(*j);
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/lb/test_offlinelb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ TEST_F(TestOfflineLB, test_offlinelb_1) {

using JSONAppender = util::json::Appender<std::stringstream>;
std::stringstream stream{std::ios_base::out | std::ios_base::in};
nlohmann::json metadata;
metadata["type"] = "LBDatafile";
auto w = std::make_unique<JSONAppender>(
"phases", "LBDatafile", std::move(stream), true
"phases", metadata, std::move(stream), true
);
for (PhaseType i = 0; i < num_phases; i++) {
auto j = dh.toJson(i);
Expand Down