diff --git a/scripts/JSON_data_files_validator.py b/scripts/JSON_data_files_validator.py index 8f81933559..436f17d14b 100644 --- a/scripts/JSON_data_files_validator.py +++ b/scripts/JSON_data_files_validator.py @@ -38,11 +38,22 @@ def get_error_message(iterable_collection: Iterable) -> str: def _get_valid_schema(self) -> Schema: """ Returns representation of a valid schema """ - allowed_types = ("LBDatafile", "LBStatsfile") + allowed_types_data = ("LBDatafile") valid_schema_data = Schema( { - 'type': And(str, lambda a: a in allowed_types, - error=f"{self.get_error_message(allowed_types)} must be chosen"), + Optional('type'): And(str, lambda a: a in allowed_types_data, + error=f"{self.get_error_message(allowed_types_data)} must be chosen"), + Optional('metadata'): { + Optional('type'): And(str, lambda a: a in allowed_types_data, + error=f"{self.get_error_message(allowed_types_data)} must be chosen"), + Optional('rank'): int, + Optional('shared_node'): { + 'id': int, + 'size': int, + 'rank': int, + 'num_nodes': int, + }, + }, 'phases': [ { 'id': int, @@ -98,10 +109,15 @@ def _get_valid_schema(self) -> Schema: ] } ) + allowed_types_stats = ("LBStatsfile") valid_schema_stats = Schema( { - 'type': And(str, lambda a: a in allowed_types, - error=f"{self.get_error_message(allowed_types)} must be chosen"), + Optional('type'): And(str, lambda a: a in allowed_types_stats, + error=f"{self.get_error_message(allowed_types_stats)} must be chosen"), + Optional('metadata'): { + Optional('type'): And(str, lambda a: a in allowed_types_stats, + error=f"{self.get_error_message(allowed_types_stats)} must be chosen"), + }, 'phases': [ { "id": int, @@ -417,7 +433,13 @@ def __validate_file(file_path): decompressed_dict = json.loads(compr_bytes.decode("utf-8")) # Extracting type from JSON data - schema_type = decompressed_dict.get("type") + schema_type = None + if decompressed_dict.get("metadata") is not None: + schema_type = decompressed_dict.get("metadata").get("type") + else: + if decompressed_dict.get("type") is not None: + schema_type = decompressed_dict.get("type") + if schema_type is not None: # Validate schema if SchemaValidator(schema_type=schema_type).is_valid(schema_to_validate=decompressed_dict): diff --git a/src/vt/runtime/runtime.cc b/src/vt/runtime/runtime.cc index 0a3f734604..3958fed6b5 100644 --- a/src/vt/runtime/runtime.cc +++ b/src/vt/runtime/runtime.cc @@ -201,6 +201,46 @@ Runtime::Runtime( setupSignalHandler(); setupSignalHandlerINT(); setupTerminateHandler(); + + if (arg_config_->config_.vt_lb_data) { + determinePhysicalNodeIDs(); + } +} + +void Runtime::determinePhysicalNodeIDs() { + MPI_Comm i_comm = initial_communicator_; + + MPI_Comm shm_comm; + MPI_Comm_split_type(i_comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &shm_comm); + int shm_rank = -1; + int node_size = -1; + MPI_Comm_rank(shm_comm, &shm_rank); + MPI_Comm_size(shm_comm, &node_size); + + int num_nodes = -1; + int is_rank_0 = (shm_rank == 0) ? 1 : 0; + MPI_Allreduce(&is_rank_0, &num_nodes, 1, MPI_INT, MPI_SUM, i_comm); + + int starting_rank = -1; + MPI_Comm_rank(i_comm, &starting_rank); + + MPI_Comm node_number_comm; + MPI_Comm_split(i_comm, shm_rank, starting_rank, &node_number_comm); + + int node_id = -1; + if (shm_rank == 0) { + MPI_Comm_rank(node_number_comm, &node_id); + } + MPI_Bcast(&node_id, 1, MPI_INT, 0, shm_comm); + + MPI_Comm_free(&shm_comm); + MPI_Comm_free(&node_number_comm); + + has_physical_node_info = true; + physical_node_id = node_id; + physical_num_nodes = num_nodes; + physical_node_size = node_size; + physical_node_rank = shm_rank; } bool Runtime::hasSchedRun() const { diff --git a/src/vt/runtime/runtime.h b/src/vt/runtime/runtime.h index ef573dfce3..00c6fed4e8 100644 --- a/src/vt/runtime/runtime.h +++ b/src/vt/runtime/runtime.h @@ -269,6 +269,11 @@ struct Runtime { */ static void writeToFile(std::string const& str); + /** + * \internal \brief Determine the physical node IDs for LB data files + */ + void determinePhysicalNodeIDs(); + protected: /** * \internal \brief Try to initialize @@ -430,6 +435,12 @@ struct Runtime { static bool volatile sig_user_1_; + bool has_physical_node_info = false; + int physical_node_id = -1; + int physical_num_nodes = -1; + int physical_node_size = -1; + int physical_node_rank = -1; + protected: bool finalize_on_term_ = false; bool initialized_ = false, finalized_ = false, aborted_ = false; diff --git a/src/vt/utils/json/json_appender.h b/src/vt/utils/json/json_appender.h index 3caf69f221..0f742b5581 100644 --- a/src/vt/utils/json/json_appender.h +++ b/src/vt/utils/json/json_appender.h @@ -69,36 +69,36 @@ struct Appender : BaseAppender { * \brief Construct a JSON appender for a specific array with a filename * * \param[in] array the JSON array name - * \param[in] schema_name the JSON schema name + * \param[in] metadata the JSON metadata * \param[in] filename the JSON filename * \param[in] compress whether to compress the output */ Appender( - std::string const& array, std::string const& schema_name, + std::string const& array, jsonlib const& metadata, std::string const& filename, bool compress ) - : Appender(array, schema_name, StreamLike{filename}, compress) + : Appender(array, metadata, StreamLike{filename}, compress) { } /** * \brief Construct a JSON appender for a specific array with a stream * * \param[in] array the JSON array name - * \param[in] schema_name the JSON schema name + * \param[in] metadata the JSON metadata * \param[in] in_os the output stream * \param[in] compress whether to compress the output */ Appender( - std::string const& array, std::string const& schema_name, StreamLike in_os, + std::string const& array, jsonlib const& metadata, StreamLike in_os, bool compress ) : os_(std::move(in_os)), oa_(std::make_shared(os_, compress)) { - oa_->write_character('{'); - auto schema_str = fmt::format("\"type\":\"{}\",", schema_name); - oa_->write_characters(schema_str.c_str(), schema_str.length()); - auto str = fmt::format("\"{}\":[", array); + oa_->write_characters("{\"metadata\":", 12); + SerializerType s(oa_, ' ', jsonlib::error_handler_t::strict); + s.dump(metadata, false, true, 0); + auto str = fmt::format(",\"{}\":[", array); oa_->write_characters(str.c_str(), str.length()); } diff --git a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc index d0389d4197..22577b6dcf 100644 --- a/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc +++ b/src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc @@ -759,8 +759,10 @@ void LBManager::createStatisticsFile() { using JSONAppender = util::json::Appender; if (not statistics_writer_) { + nlohmann::json metadata; + metadata["type"] = "LBStatsfile"; statistics_writer_ = std::make_unique( - "phases", "LBStatsfile", file_name, compress + "phases", metadata, file_name, compress ); } } diff --git a/src/vt/vrt/collection/balance/node_lb_data.cc b/src/vt/vrt/collection/balance/node_lb_data.cc index bc363f32ae..6a5cd55182 100644 --- a/src/vt/vrt/collection/balance/node_lb_data.cc +++ b/src/vt/vrt/collection/balance/node_lb_data.cc @@ -194,8 +194,19 @@ void NodeLBData::createLBDataFile() { using JSONAppender = util::json::Appender; if (not lb_data_writer_) { + nlohmann::json metadata; + if (curRT->has_physical_node_info) { + nlohmann::json node_metadata; + node_metadata["id"] = curRT->physical_node_id; + node_metadata["size"] = curRT->physical_node_size; + node_metadata["rank"] = curRT->physical_node_rank; + node_metadata["num_nodes"] = curRT->physical_num_nodes; + metadata["shared_node"] = node_metadata; + } + metadata["type"] = "LBDatafile"; + metadata["rank"] = theContext()->getNode(); lb_data_writer_ = std::make_unique( - "phases", "LBDatafile", file_name, compress + "phases", metadata, file_name, compress ); } } diff --git a/tests/unit/collection/test_lb.extended.cc b/tests/unit/collection/test_lb.extended.cc index 409b3a591c..bb92395840 100644 --- a/tests/unit/collection/test_lb.extended.cc +++ b/tests/unit/collection/test_lb.extended.cc @@ -499,8 +499,10 @@ getLBDataForPhase( using vt::vrt::collection::balance::LBDataHolder; using json = nlohmann::json; std::stringstream ss{std::ios_base::out | std::ios_base::in}; + nlohmann::json metadata; + metadata["type"] = "LBDatafile"; auto ap = std::make_unique( - "phases", "LBDatafile", std::move(ss), false + "phases", metadata, std::move(ss), false ); auto j = in.toJson(phase); ap->addElm(*j); @@ -689,8 +691,10 @@ getJsonStringForPhase( using vt::vrt::collection::balance::LBDataHolder; using JSONAppender = vt::util::json::Appender; std::stringstream ss{std::ios_base::out | std::ios_base::in}; + nlohmann::json metadata; + metadata["type"] = "LBDatafile"; auto ap = std::make_unique( - "phases", "LBDatafile", std::move(ss), false + "phases", metadata, std::move(ss), false ); auto j = in.toJson(phase); ap->addElm(*j); diff --git a/tests/unit/lb/test_lb_data_comm.cc b/tests/unit/lb/test_lb_data_comm.cc index cdea44d670..9fa555fadf 100644 --- a/tests/unit/lb/test_lb_data_comm.cc +++ b/tests/unit/lb/test_lb_data_comm.cc @@ -77,8 +77,10 @@ LBDataHolder getLBDataForPhase(vt::PhaseType phase) { using vt::vrt::collection::balance::LBDataHolder; using json = nlohmann::json; std::stringstream ss{std::ios_base::out | std::ios_base::in}; + nlohmann::json metadata; + metadata["type"] = "LBDatafile"; auto ap = std::make_unique( - "phases", "LBDatafile", std::move(ss), true + "phases", metadata, std::move(ss), true ); auto j = vt::theNodeLBData()->getLBData()->toJson(phase); ap->addElm(*j); diff --git a/tests/unit/lb/test_offlinelb.cc b/tests/unit/lb/test_offlinelb.cc index b73efd1b7b..981e09d801 100644 --- a/tests/unit/lb/test_offlinelb.cc +++ b/tests/unit/lb/test_offlinelb.cc @@ -118,8 +118,10 @@ TEST_F(TestOfflineLB, test_offlinelb_1) { using JSONAppender = util::json::Appender; std::stringstream stream{std::ios_base::out | std::ios_base::in}; + nlohmann::json metadata; + metadata["type"] = "LBDatafile"; auto w = std::make_unique( - "phases", "LBDatafile", std::move(stream), true + "phases", metadata, std::move(stream), true ); for (PhaseType i = 0; i < num_phases; i++) { auto j = dh.toJson(i);