Skip to content

Commit

Permalink
Restructure MQTT config distribution (#234)
Browse files Browse the repository at this point in the history
* Remove serialize function from config
* Directly pass the active_modules part of a config to the parse function

Skip storing it in a temporary variable

* Only publish module_names (a mapping of module type to id) once and retain

Previously this was sent to every module individually which isn't necessary

* Publish manifest individually

Add type to parsed_config_map and remove config entry from manifest sent via MQTT

* Fix everestjs config parsing with new config entry format

* Use get_module_name instead of get_module_info if only the module name is required

* Re-order config handler in manager

---------

Signed-off-by: Kai-Uwe Hermann <[email protected]>
  • Loading branch information
hikinggrass authored Jan 31, 2025
1 parent de31429 commit 26e31ac
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 60 deletions.
8 changes: 8 additions & 0 deletions everestjs/conversions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ Everest::json convertToJson(const Napi::Value& value) {
napi_valuetype_strings[value.Type()]));
}

Everest::json convertToConfigMap(const Everest::json& json_config) {
json config_map;
for (auto& entry : json_config.items()) {
config_map[entry.key()] = entry.value().at("value");
}
return config_map;
}

Everest::TelemetryMap convertToTelemetryMap(const Napi::Object& obj) {
BOOST_LOG_FUNCTION();
Everest::TelemetryMap telemetry;
Expand Down
1 change: 1 addition & 0 deletions everestjs/conversions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ static const char* const napi_valuetype_strings[] = {
};

Everest::json convertToJson(const Napi::Value& value);
Everest::json convertToConfigMap(const Everest::json& json_config);
Everest::TelemetryMap convertToTelemetryMap(const Napi::Object& obj);
Napi::Value convertToNapiValue(const Napi::Env& env, const Everest::json& value);

Expand Down
7 changes: 4 additions & 3 deletions everestjs/everestjs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -887,14 +887,15 @@ static Napi::Value boot_module(const Napi::CallbackInfo& info) {
auto module_config_prop = Napi::Object::New(env);
auto module_config_impl_prop = Napi::Object::New(env);

for (auto& config_map : module_config.items()) {
for (const auto& config_map : module_config.items()) {
const auto& json_config_map = convertToConfigMap(config_map.value());
if (config_map.key() == "!module") {
module_config_prop.DefineProperty(Napi::PropertyDescriptor::Value(
"module", convertToNapiValue(env, config_map.value()), napi_enumerable));
"module", convertToNapiValue(env, json_config_map), napi_enumerable));
continue;
}
module_config_impl_prop.DefineProperty(Napi::PropertyDescriptor::Value(
config_map.key(), convertToNapiValue(env, config_map.value()), napi_enumerable));
config_map.key(), convertToNapiValue(env, json_config_map), napi_enumerable));
}
module_config_prop.DefineProperty(
Napi::PropertyDescriptor::Value("impl", module_config_impl_prop, napi_enumerable));
Expand Down
8 changes: 2 additions & 6 deletions include/utils/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,6 @@ class ManagerConfig : public ConfigBase {
/// \brief Create a ManagerConfig from the provided ManagerSettings \p ms
explicit ManagerConfig(const ManagerSettings& ms);

///
/// \brief Serialize the config to json
nlohmann::json serialize();

///
/// \returns a TelemetryConfig if this has been configured for the given \p module_id
std::optional<TelemetryConfig> get_telemetry_config(const std::string& module_id);
Expand Down Expand Up @@ -330,7 +326,7 @@ class Config : public ConfigBase {

///
/// \returns the commands that the modules \p module_name implements from the given implementation \p impl_id
nlohmann::json get_module_cmds(const std::string& module_name, const std::string& impl_id);
const nlohmann::json& get_module_cmds(const std::string& module_name, const std::string& impl_id);

///
/// \brief A RequirementInitialization contains everything needed to initialize a requirement in user code. This
Expand All @@ -344,7 +340,7 @@ class Config : public ConfigBase {

///
/// \returns a json object that contains the module config options
nlohmann::json get_module_json_config(const std::string& module_id);
const nlohmann::json& get_module_json_config(const std::string& module_id);

///
/// \brief assemble basic information about the module (id, name,
Expand Down
24 changes: 9 additions & 15 deletions lib/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ static ParsedConfigMap parse_config_map(const json& config_map_schema, const jso
throw ConfigParseException(ConfigParseException::SCHEMA, config_entry_name, err.what());
}

parsed_config_map[config_entry_name] = config_entry_value;
parsed_config_map[config_entry_name] =
json::object({{"value", config_entry_value}, {"type", config_entry.at("type")}});
}

return {parsed_config_map, unknown_config_entries};
Expand Down Expand Up @@ -1155,18 +1156,13 @@ ManagerConfig::ManagerConfig(const ManagerSettings& ms) : ConfigBase(ms.mqtt_set
complete_config = complete_config.patch(patch);
}

const auto config = complete_config.at("active_modules");
this->settings = this->ms.get_runtime_settings();
this->parse(config);
this->parse(complete_config.at("active_modules"));
} catch (const std::exception& e) {
EVLOG_AND_THROW(EverestConfigError(fmt::format("Failed to load and parse config file: {}", e.what())));
}
}

json ManagerConfig::serialize() {
return json::object({{"main", this->main}, {"module_names", this->module_names}});
}

std::optional<TelemetryConfig> ManagerConfig::get_telemetry_config(const std::string& module_id) {
BOOST_LOG_FUNCTION();

Expand Down Expand Up @@ -1208,7 +1204,7 @@ bool Config::module_provides(const std::string& module_name, const std::string&
return (provides.find(impl_id) != provides.end());
}

json Config::get_module_cmds(const std::string& module_name, const std::string& impl_id) {
const json& Config::get_module_cmds(const std::string& module_name, const std::string& impl_id) {
return this->module_config_cache.at(module_name).cmds.at(impl_id);
}

Expand Down Expand Up @@ -1236,14 +1232,12 @@ ModuleConfigs Config::get_module_configs(const std::string& module_id) const {
const json manifest = this->manifests.at(module_type);

for (const auto& conf_map : config_maps.items()) {
const json config_schema = (conf_map.key() == "!module")
? manifest.at("config")
: manifest.at("provides").at(conf_map.key()).at("config");
ConfigMap processed_conf_map;
for (const auto& entry : conf_map.value().items()) {
const json entry_type = config_schema.at(entry.key()).at("type");
const auto& entry_value = entry.value();
const json entry_type = entry_value.at("type");
ConfigEntry value;
const json& data = entry.value();
const json& data = entry_value.at("value");

if (data.is_string()) {
value = data.get<std::string>();
Expand Down Expand Up @@ -1273,9 +1267,9 @@ ModuleConfigs Config::get_module_configs(const std::string& module_id) const {
}

// FIXME (aw): check if module_id does not exist
json Config::get_module_json_config(const std::string& module_id) {
const json& Config::get_module_json_config(const std::string& module_id) {
BOOST_LOG_FUNCTION();
return this->main[module_id]["config_maps"];
return this->main.at(module_id).at("config_maps");
}

ModuleInfo Config::get_module_info(const std::string& module_id) const {
Expand Down
6 changes: 3 additions & 3 deletions lib/everest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ void Everest::heartbeat() {
void Everest::publish_metadata() {
BOOST_LOG_FUNCTION();

const auto module_info = this->config.get_module_info(this->module_id);
const auto manifest = this->config.get_manifests().at(module_info.name);
const auto& module_name = this->config.get_module_name(this->module_id);
const auto& manifest = this->config.get_manifests().at(module_name);

json metadata = json({});
metadata["module"] = module_info.name;
metadata["module"] = module_name;
if (manifest.contains("provides")) {
metadata["provides"] = json({});

Expand Down
12 changes: 11 additions & 1 deletion lib/module_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,18 @@ json get_module_config(std::shared_ptr<MQTTAbstraction> mqtt, const std::string&
const auto schemas = mqtt->get(schemas_topic, QOS::QOS2);
result["schemas"] = schemas;

const auto module_names_topic = fmt::format("{}module_names", everest_prefix);
const auto module_names = mqtt->get(module_names_topic, QOS::QOS2);
result["module_names"] = module_names;

const auto manifests_topic = fmt::format("{}manifests", everest_prefix);
const auto manifests = mqtt->get(manifests_topic, QOS::QOS2);
auto manifests = json::object();
for (const auto& module_name : module_names) {
auto manifest_topic = fmt::format("{}manifests/{}", everest_prefix, module_name.get<std::string>());
auto manifest = mqtt->get(manifest_topic, QOS::QOS2);
manifests[module_name] = manifest;
}

result["manifests"] = manifests;

const auto error_types_map_topic = fmt::format("{}error_types_map", everest_prefix);
Expand Down
48 changes: 28 additions & 20 deletions src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,11 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
std::vector<ModuleStartInfo> modules_to_spawn;

const auto& main_config = config.get_main_config();
const auto& module_names = config.get_module_names();
modules_to_spawn.reserve(main_config.size());
const auto number_of_modules = main_config.size();
EVLOG_info << "Starting " << number_of_modules << " modules";

const auto serialized_config = config.serialize();
const auto interface_definitions = config.get_interface_definitions();
std::vector<std::string> interface_names;
for (auto& interface_definition : interface_definitions.items()) {
Expand Down Expand Up @@ -317,7 +318,13 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
mqtt_abstraction.publish(fmt::format("{}schemas", ms.mqtt_settings.everest_prefix), schemas, QOS::QOS2, true);

const auto manifests = config.get_manifests();
mqtt_abstraction.publish(fmt::format("{}manifests", ms.mqtt_settings.everest_prefix), manifests, QOS::QOS2, true);

for (const auto& manifest : manifests.items()) {
auto manifest_copy = manifest.value();
manifest_copy.erase("config");
mqtt_abstraction.publish(fmt::format("{}manifests/{}", ms.mqtt_settings.everest_prefix, manifest.key()),
manifest_copy, QOS::QOS2, true);
}

const auto error_types_map = config.get_error_types();
mqtt_abstraction.publish(fmt::format("{}error_types_map", ms.mqtt_settings.everest_prefix), error_types_map,
Expand All @@ -327,12 +334,16 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
mqtt_abstraction.publish(fmt::format("{}module_config_cache", ms.mqtt_settings.everest_prefix), module_config_cache,
QOS::QOS2, true);

for (const auto& module : serialized_config.at("module_names").items()) {
const std::string& module_name = module.key();
json serialized_mod_config = serialized_config;
mqtt_abstraction.publish(fmt::format("{}module_names", ms.mqtt_settings.everest_prefix), module_names, QOS::QOS2,
true);

for (const auto& module_name_entry : module_names) {
const auto& module_name = module_name_entry.first;
const auto& module_type = module_name_entry.second;
json serialized_mod_config = json::object();
serialized_mod_config["module_config"] = json::object();
serialized_mod_config["module_config"][module_name] = main_config.at(module_name);
// add mappings of fulfillments
serialized_mod_config["module_config"][module_name] = serialized_config.at("main").at(module_name);
const auto fulfillments = config.get_fulfillments(module_name);
serialized_mod_config["mappings"] = json::object();
for (const auto& fulfillment_list : fulfillments) {
Expand All @@ -348,7 +359,6 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
if (mappings.has_value()) {
serialized_mod_config["mappings"][module_name] = mappings.value();
}
serialized_mod_config.erase("main"); // FIXME: do not put this "main" config in there in the first place
const auto telemetry_config = config.get_telemetry_config(module_name);
if (telemetry_config.has_value()) {
serialized_mod_config["telemetry_config"] = telemetry_config.value();
Expand All @@ -359,11 +369,20 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
continue;
}

// FIXME (aw): shall create a ref to main_confit.at(module_name)!
const std::string module_type = main_config.at(module_name).at("module");
// FIXME (aw): implicitely adding ModuleReadyInfo and setting its ready member
auto module_it = modules_ready.emplace(module_name, ModuleReadyInfo{false, nullptr, nullptr}).first;

const std::string config_topic = fmt::format("{}/config", config.mqtt_module_prefix(module_name));
const Handler module_get_config_handler = [module_name, config_topic, serialized_mod_config,
&mqtt_abstraction](const std::string&, const nlohmann::json& json) {
mqtt_abstraction.publish(config_topic, serialized_mod_config.dump(), QOS::QOS2);
};

const std::string get_config_topic = fmt::format("{}/get_config", config.mqtt_module_prefix(module_name));
module_it->second.get_config_token = std::make_shared<TypedHandler>(
HandlerType::ExternalMQTT, std::make_shared<Handler>(module_get_config_handler));
mqtt_abstraction.register_handler(get_config_topic, module_it->second.get_config_token, QOS::QOS2);

const auto capabilities = [&module_config = main_config.at(module_name)]() {
const auto cap_it = module_config.find("capabilities");
if (cap_it == module_config.end()) {
Expand Down Expand Up @@ -427,17 +446,6 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
std::make_shared<TypedHandler>(HandlerType::ExternalMQTT, std::make_shared<Handler>(module_ready_handler));
mqtt_abstraction.register_handler(ready_topic, module_it->second.ready_token, QOS::QOS2);

const std::string config_topic = fmt::format("{}/config", config.mqtt_module_prefix(module_name));
const Handler module_get_config_handler = [module_name, config_topic, serialized_mod_config,
&mqtt_abstraction](const std::string&, const nlohmann::json& json) {
mqtt_abstraction.publish(config_topic, serialized_mod_config.dump());
};

const std::string get_config_topic = fmt::format("{}/get_config", config.mqtt_module_prefix(module_name));
module_it->second.get_config_token = std::make_shared<TypedHandler>(
HandlerType::ExternalMQTT, std::make_shared<Handler>(module_get_config_handler));
mqtt_abstraction.register_handler(get_config_topic, module_it->second.get_config_token, QOS::QOS2);

if (std::any_of(standalone_modules.begin(), standalone_modules.end(),
[module_name](const auto& element) { return element == module_name; })) {
EVLOG_info << "Not starting standalone module: " << fmt::format(TERMINAL_STYLE_BLUE, "{}", module_name);
Expand Down
12 changes: 0 additions & 12 deletions tests/test_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,6 @@ SCENARIO("Check Config Constructor", "[!throws]") {
}());
}
}
GIVEN("A valid config with a valid module serialized") {
auto ms =
Everest::ManagerSettings(bin_dir + "valid_module_config/", bin_dir + "valid_module_config/config.yaml");
THEN("It should not throw at all") {
CHECK_NOTHROW([&]() {
auto mc = Everest::ManagerConfig(ms);
auto serialized = mc.serialize();
CHECK(serialized.at("module_names").size() == 1);
CHECK(serialized.at("module_names").at("valid_module") == "TESTValidManifest");
}());
}
}
GIVEN("A valid config in legacy json format with a valid module") {
auto ms = Everest::ManagerSettings(bin_dir + "valid_module_config_json/",
bin_dir + "valid_module_config_json/config.json");
Expand Down

0 comments on commit 26e31ac

Please sign in to comment.