Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/restructure-m…
Browse files Browse the repository at this point in the history
…qtt-config

Signed-off-by: Kai-Uwe Hermann <[email protected]>
  • Loading branch information
hikinggrass committed Jan 31, 2025
2 parents 26f70b0 + de31429 commit 0d4eb7b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 37 deletions.
9 changes: 8 additions & 1 deletion everestrs/everestrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,17 @@ impl Runtime {

// Subscribe to all variables that might be of interest.
for (implementation_id, requires) in manifest.requires {
let connection = connections.get(&implementation_id).cloned().unwrap_or(0);
let interface_s = self.cpp_module.get_interface(&requires.interface);
// EVerest framework may return null if an interface is not used in
// the config (the connection is then 0).
if interface_s.as_bytes() == b"null" && connection == 0 {
debug!("Skipping the interface {implementation_id}");
continue;
}
let interface: schema::InterfaceFromEverest = interface_s.deserialize();

for i in 0usize..connections.get(&implementation_id).cloned().unwrap_or(0) {
for i in 0usize..connection {
for (name, _) in interface.vars.iter() {
self.cpp_module.as_ref().unwrap().subscribe_variable(
self,
Expand Down
4 changes: 4 additions & 0 deletions include/utils/mqtt_abstraction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class MQTTAbstraction {
/// \copydoc MQTTAbstractionImpl::unsubscribe(const std::string&)
void unsubscribe(const std::string& topic);

///
/// \copydoc MQTTAbstractionImpl::clear_retained_topics()
void clear_retained_topics();

///
/// \copydoc MQTTAbstractionImpl::get(const std::string&, QOS)
nlohmann::json get(const std::string& topic, QOS qos);
Expand Down
6 changes: 6 additions & 0 deletions include/utils/mqtt_abstraction_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class MQTTAbstractionImpl {
/// \brief unsubscribes from the given \p topic
void unsubscribe(const std::string& topic);

///
/// \brief clears any previously published topics that had the retain flag set
void clear_retained_topics();

///
/// \brief subscribe and wait for value on the subscribed topic
nlohmann::json get(const std::string& topic, QOS qos);
Expand Down Expand Up @@ -121,6 +125,8 @@ class MQTTAbstractionImpl {
MessageQueue message_queue;
std::vector<std::shared_ptr<MessageWithQOS>> messages_before_connected;
std::mutex messages_before_connected_mutex;
std::mutex retained_topics_mutex;
std::vector<std::string> retained_topics;

Thread mqtt_mainloop_thread;
std::shared_future<void> main_loop_future;
Expand Down
5 changes: 5 additions & 0 deletions lib/mqtt_abstraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ void MQTTAbstraction::unsubscribe(const std::string& topic) {
mqtt_abstraction->unsubscribe(topic);
}

void MQTTAbstraction::clear_retained_topics() {
BOOST_LOG_FUNCTION();
mqtt_abstraction->clear_retained_topics();
}

json MQTTAbstraction::get(const std::string& topic, QOS qos) {
BOOST_LOG_FUNCTION();
return mqtt_abstraction->get(topic, qos);
Expand Down
19 changes: 19 additions & 0 deletions lib/mqtt_abstraction_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ void MQTTAbstractionImpl::publish(const std::string& topic, const std::string& d

if (retain) {
publish_flags |= MQTT_PUBLISH_RETAIN;
if (not(data.empty() and qos == QOS::QOS0)) {
// topic should be retained, so save the topic in retained_topics
// do not save the topic when the payload is empty and QOS is set to 0 which means a retained topic is to be
// cleared
const std::lock_guard<std::mutex> lock(retained_topics_mutex);
this->retained_topics.push_back(topic);
}
}

if (!this->mqtt_is_connected) {
Expand Down Expand Up @@ -207,6 +214,18 @@ void MQTTAbstractionImpl::unsubscribe(const std::string& topic) {
notify_write_data();
}

void MQTTAbstractionImpl::clear_retained_topics() {
BOOST_LOG_FUNCTION();
const std::lock_guard<std::mutex> lock(retained_topics_mutex);

for (const auto& retained_topic : retained_topics) {
this->publish(retained_topic, std::string(), QOS::QOS0, true);
EVLOG_verbose << "Cleared retained topic: " << retained_topic;
}

retained_topics.clear();
}

json MQTTAbstractionImpl::get(const std::string& topic, QOS qos) {
BOOST_LOG_FUNCTION();
std::promise<json> res_promise;
Expand Down
55 changes: 19 additions & 36 deletions src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,39 +265,11 @@ struct ModuleReadyInfo {
std::map<std::string, ModuleReadyInfo> modules_ready;
std::mutex modules_ready_mutex;

void cleanup_retained_topics(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction,
const std::string& mqtt_everest_prefix) {
const auto& interface_definitions = config.get_interface_definitions();

mqtt_abstraction.publish(fmt::format("{}interfaces", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

for (const auto& interface_definition : interface_definitions.items()) {
mqtt_abstraction.publish(
fmt::format("{}interface_definitions/{}", mqtt_everest_prefix, interface_definition.key()), std::string(),
QOS::QOS2, true);
}

mqtt_abstraction.publish(fmt::format("{}types", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}module_provides", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}settings", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}schemas", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}manifests", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}error_types_map", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}module_config_cache", mqtt_everest_prefix), std::string(), QOS::QOS2, true);

mqtt_abstraction.publish(fmt::format("{}module_names", mqtt_everest_prefix), std::string(), QOS::QOS2, true);
}

static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbstraction& mqtt_abstraction,
const std::vector<std::string>& ignored_modules,
const std::vector<std::string>& standalone_modules,
const ManagerSettings& ms, StatusFifo& status_fifo) {
const ManagerSettings& ms, StatusFifo& status_fifo,
bool retain_topics) {
BOOST_LOG_FUNCTION();

std::vector<ModuleStartInfo> modules_to_spawn;
Expand Down Expand Up @@ -426,8 +398,8 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
}

const Handler module_ready_handler = [module_name, &mqtt_abstraction, &config, standalone_modules,
mqtt_everest_prefix = ms.mqtt_settings.everest_prefix,
&status_fifo](const std::string&, const nlohmann::json& json) {
mqtt_everest_prefix = ms.mqtt_settings.everest_prefix, &status_fifo,
retain_topics](const std::string&, const nlohmann::json& json) {
EVLOG_debug << fmt::format("received module ready signal for module: {}({})", module_name, json.dump());
const std::unique_lock<std::mutex> lock(modules_ready_mutex);
// FIXME (aw): here are race conditions, if the ready handler gets called while modules are shut down!
Expand All @@ -449,11 +421,16 @@ static std::map<pid_t, std::string> start_modules(ManagerConfig& config, MQTTAbs
[](const auto& element) { return element.second.ready; })) {
const auto complete_end_time = std::chrono::system_clock::now();
status_fifo.update(StatusFifo::ALL_MODULES_STARTED);
if (not retain_topics) {
EVLOG_info << "Clearing retained topics published by manager during startup";
mqtt_abstraction.clear_retained_topics();
} else {
EVLOG_info << "Keeping retained topics published by manager during startup for inspection";
}
EVLOG_info << fmt::format(
TERMINAL_STYLE_OK, "🚙🚙🚙 All modules are initialized. EVerest up and running [{}ms] 🚙🚙🚙",
std::chrono::duration_cast<std::chrono::milliseconds>(complete_end_time - complete_start_time)
.count());
// cleanup_retained_topics(config, mqtt_abstraction, mqtt_everest_prefix);
mqtt_abstraction.publish(fmt::format("{}ready", mqtt_everest_prefix), nlohmann::json(true));
} else if (!standalone_modules.empty()) {
if (modules_spawned == modules_ready.size() - standalone_modules.size()) {
Expand Down Expand Up @@ -676,6 +653,8 @@ int boot(const po::variables_map& vm) {
return EXIT_SUCCESS;
}

const bool retain_topics = (vm.count("retain-topics") != 0);

const auto start_time = std::chrono::system_clock::now();
std::unique_ptr<ManagerConfig> config;
try {
Expand Down Expand Up @@ -769,7 +748,7 @@ int boot(const po::variables_map& vm) {
mqtt_abstraction.spawn_main_loop_thread();

auto module_handles =
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo);
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo, retain_topics);
bool modules_started = true;
bool restart_modules = false;

Expand Down Expand Up @@ -824,6 +803,8 @@ int boot(const po::variables_map& vm) {
shutdown_modules(module_handles, *config, mqtt_abstraction);
modules_started = false;

mqtt_abstraction.clear_retained_topics();

// Exit if a module died, this gives systemd a change to restart manager
EVLOG_critical << "Exiting manager.";
return EXIT_FAILURE;
Expand All @@ -834,8 +815,8 @@ int boot(const po::variables_map& vm) {

#ifdef ENABLE_ADMIN_PANEL
if (module_handles.size() == 0 && restart_modules) {
module_handles =
start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms, status_fifo);
module_handles = start_modules(*config, mqtt_abstraction, ignored_modules, standalone_modules, ms,
status_fifo, retain_topics);
restart_modules = false;
modules_started = true;
}
Expand Down Expand Up @@ -899,6 +880,8 @@ int main(int argc, char* argv[]) {
"looked up in the default config directory");
desc.add_options()("status-fifo", po::value<std::string>()->default_value(""),
"Path to a named pipe, that shall be used for status updates from the manager");
desc.add_options()("retain-topics", "Retain configuration MQTT topics setup by manager for inspection, by default "
"these will be cleared after startup");

po::variables_map vm;

Expand Down

0 comments on commit 0d4eb7b

Please sign in to comment.