Skip to content

Commit

Permalink
* Added support for Unix Domain Sockers for MQTT communication
Browse files Browse the repository at this point in the history
   - Created a new config option 'mqtt_borker_socket_path' where one can provide the socket path
   - The config option mutual exclusive to the normal TCP socket and will raise an exception if both are provided in the config file (or as env variable)
   - There is no default value for the UDS path, it must be provided in the config file
   - We keep the backward compatibility, thus if nothing is provided in the config file, the default MQTT values are used (localhost:1883)
   - Beside functionality added, this provides an optimization over the TCP connection (boserved on slow systems around 5%)
   - Updated the config schema to for the new option
* Optimization made in the MQTT event loop (we moved from executing the event loop from 100ms to 300s)
   - Normally the keep alive message is send by the MQTT lib, in certain cases however this might not work unless you call mqtt_sync often and regular
   - We decreased the event loop execution frequency drastically (on slow systems we can see a decrease of CPU usage around 10%)
  • Loading branch information
Florin Mihut committed Feb 21, 2024
1 parent 7f532fa commit cb607f7
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 29 deletions.
5 changes: 3 additions & 2 deletions include/framework/everest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ using UnsubscribeToken = std::function<void()>;
class Everest {
public:
Everest(std::string module_id, const Config& config, bool validate_data_with_schema,
const std::string& mqtt_server_address, int mqtt_server_port, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix, const std::string& telemetry_prefix, bool telemetry_enabled);
const std::string& mqtt_server_socket_path, const std::string& mqtt_server_address, int mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& telemetry_prefix, bool telemetry_enabled);

// forbid copy assignment and copy construction
// NOTE (aw): move assignment and construction are also not supported because we're creating explicit references to
Expand Down
2 changes: 2 additions & 0 deletions include/framework/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ inline constexpr auto WWW_DIR = "www";

inline constexpr auto CONTROLLER_PORT = 8849;
inline constexpr auto CONTROLLER_RPC_TIMEOUT_MS = 2000;
inline constexpr auto MQTT_BROKER_SOCKET_PATH = "/tmp/mqtt_brocker.sock";
inline constexpr auto MQTT_BROKER_HOST = "localhost";
inline constexpr auto MQTT_BROKER_PORT = 1883;
inline constexpr auto MQTT_EVEREST_PREFIX = "everest";
Expand Down Expand Up @@ -109,6 +110,7 @@ struct RuntimeSettings {
fs::path www_dir;
int controller_port;
int controller_rpc_timeout_ms;
std::string mqtt_broker_socket_path;

Check notice on line 113 in include/framework/runtime.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

include/framework/runtime.hpp#L113

struct member 'RuntimeSettings::mqtt_broker_socket_path' is never used.
std::string mqtt_broker_host;
int mqtt_broker_port;
std::string mqtt_everest_prefix;
Expand Down
5 changes: 3 additions & 2 deletions include/utils/mqtt_abstraction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ class MQTTAbstractionImpl;
///
class MQTTAbstraction {
public:
MQTTAbstraction(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
MQTTAbstraction(const std::string& mqtt_server_socket_path, const std::string& mqtt_server_address,
const std::string& mqtt_server_port, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix);

// forbid copy assignment and copy construction
MQTTAbstraction(MQTTAbstraction const&) = delete;
Expand Down
7 changes: 6 additions & 1 deletion include/utils/mqtt_abstraction_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class MQTTAbstractionImpl {
public:
MQTTAbstractionImpl(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix);
MQTTAbstractionImpl(const std::string& mqtt_server_socket_path, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix);

~MQTTAbstractionImpl();

MQTTAbstractionImpl(MQTTAbstractionImpl const&) = delete;
Expand Down Expand Up @@ -104,7 +107,7 @@ class MQTTAbstractionImpl {
static void publish_callback(void** unused, struct mqtt_response_publish* published);

private:
static constexpr int mqtt_poll_timeout_ms{100};
static constexpr int mqtt_poll_timeout_ms{300000};

Check notice on line 110 in include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

include/utils/mqtt_abstraction_impl.hpp#L110

class member 'MQTTAbstractionImpl::mqtt_poll_timeout_ms' is never used.
bool mqtt_is_connected;
std::map<std::string, MessageHandler> message_handlers;
std::mutex handlers_mutex;
Expand All @@ -114,6 +117,7 @@ class MQTTAbstractionImpl {

Thread mqtt_mainloop_thread;

std::string mqtt_server_socket_path;

Check notice on line 120 in include/utils/mqtt_abstraction_impl.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

include/utils/mqtt_abstraction_impl.hpp#L120

class member 'MQTTAbstractionImpl::mqtt_server_socket_path' is never used.
std::string mqtt_server_address;
std::string mqtt_server_port;
std::string mqtt_everest_prefix;
Expand All @@ -123,6 +127,7 @@ class MQTTAbstractionImpl {
uint8_t recvbuf[MQTT_BUF_SIZE];

static int open_nb_socket(const char* addr, const char* port);
bool connectBroker(const char* socket_path);
bool connectBroker(const char* host, const char* port);
void on_mqtt_message(std::shared_ptr<Message> message);
void on_mqtt_connect();
Expand Down
8 changes: 5 additions & 3 deletions lib/everest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ const auto remote_cmd_res_timeout_seconds = 300;
const std::array<std::string, 3> TELEMETRY_RESERVED_KEYS = {{"connector_id"}};

Everest::Everest(std::string module_id_, const Config& config_, bool validate_data_with_schema,
const std::string& mqtt_server_address, int mqtt_server_port, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix, const std::string& telemetry_prefix, bool telemetry_enabled) :
mqtt_abstraction(mqtt_server_address, std::to_string(mqtt_server_port), mqtt_everest_prefix, mqtt_external_prefix),
const std::string& mqtt_server_socket_path, const std::string& mqtt_server_address,
int mqtt_server_port, const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix,
const std::string& telemetry_prefix, bool telemetry_enabled) :
mqtt_abstraction(mqtt_server_socket_path, mqtt_server_address, std::to_string(mqtt_server_port),
mqtt_everest_prefix, mqtt_external_prefix),
config(std::move(config_)),
module_id(std::move(module_id_)),
remote_cmd_res_timeout(remote_cmd_res_timeout_seconds),
Expand Down
14 changes: 10 additions & 4 deletions lib/mqtt_abstraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
#include <utils/mqtt_abstraction_impl.hpp>

namespace Everest {
MQTTAbstraction::MQTTAbstraction(const std::string& mqtt_server_address, const std::string& mqtt_server_port,
const std::string& mqtt_everest_prefix, const std::string& mqtt_external_prefix) :
mqtt_abstraction(std::make_unique<MQTTAbstractionImpl>(mqtt_server_address, mqtt_server_port, mqtt_everest_prefix,
mqtt_external_prefix)) {
MQTTAbstraction::MQTTAbstraction(const std::string& mqtt_server_socket_path, const std::string& mqtt_server_address,
const std::string& mqtt_server_port, const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix) {
EVLOG_debug << "initialized mqtt_abstraction";
if (mqtt_server_socket_path.empty()) {
mqtt_abstraction = std::make_unique<MQTTAbstractionImpl>(mqtt_server_address, mqtt_server_port,
mqtt_everest_prefix, mqtt_external_prefix);
} else {
mqtt_abstraction =
std::make_unique<MQTTAbstractionImpl>(mqtt_server_socket_path, mqtt_everest_prefix, mqtt_external_prefix);
}
}

MQTTAbstraction::~MQTTAbstraction() = default;
Expand Down
107 changes: 99 additions & 8 deletions lib/mqtt_abstraction_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <poll.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/un.h>

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
Expand All @@ -23,7 +24,7 @@
#include <utils/mqtt_abstraction_impl.hpp>

namespace Everest {
const auto mqtt_keep_alive = 400;
const auto mqtt_keep_alive = 600;

MessageWithQOS::MessageWithQOS(const std::string& topic, const std::string& payload, QOS qos) :
Message(topic, payload), qos(qos) {
Expand All @@ -49,6 +50,25 @@ MQTTAbstractionImpl::MQTTAbstractionImpl(const std::string& mqtt_server_address,
this->mqtt_client.publish_response_callback_state = &this->message_queue;
}

MQTTAbstractionImpl::MQTTAbstractionImpl(const std::string& mqtt_server_socket_path,
const std::string& mqtt_everest_prefix,
const std::string& mqtt_external_prefix) :
message_queue(([this](std::shared_ptr<Message> message) { this->on_mqtt_message(message); })),
mqtt_server_socket_path(mqtt_server_socket_path),
mqtt_everest_prefix(mqtt_everest_prefix),
mqtt_external_prefix(mqtt_external_prefix),
mqtt_client{},
sendbuf{},
recvbuf{} {
BOOST_LOG_FUNCTION();

EVLOG_debug << "Initializing MQTT abstraction layer...";

this->mqtt_is_connected = false;

this->mqtt_client.publish_response_callback_state = &this->message_queue;
}

MQTTAbstractionImpl::~MQTTAbstractionImpl() {
// FIXME (aw): verify that disconnecting is thread-safe!
if (this->mqtt_is_connected) {
Expand All @@ -60,9 +80,14 @@ MQTTAbstractionImpl::~MQTTAbstractionImpl() {
bool MQTTAbstractionImpl::connect() {
BOOST_LOG_FUNCTION();

EVLOG_debug << fmt::format("Connecting to MQTT broker: {}:{}", this->mqtt_server_address, this->mqtt_server_port);

return connectBroker(this->mqtt_server_address.c_str(), this->mqtt_server_port.c_str());
if (!this->mqtt_server_socket_path.empty()) {
EVLOG_debug << fmt::format("Connecting to MQTT broker: {}", this->mqtt_server_socket_path);
return connectBroker(this->mqtt_server_socket_path.c_str());
} else {
EVLOG_debug << fmt::format("Connecting to MQTT broker: {}:{}", this->mqtt_server_address,
this->mqtt_server_port);
return connectBroker(this->mqtt_server_address.c_str(), this->mqtt_server_port.c_str());
}
}

void MQTTAbstractionImpl::disconnect() {
Expand Down Expand Up @@ -178,12 +203,23 @@ std::future<void> MQTTAbstractionImpl::spawn_main_loop_thread() {
auto retval = ::poll(pollfds, 2, mqtt_poll_timeout_ms);

if (retval >= 0) {
// check for write notification and reset it
if (pollfds[1].revents & POLLIN) {
// FIXME (aw): check for failure
eventfd_read(this->event_fd, &eventfd_buffer);
// data available to send (the notifier writes, we should be ready to read)
if (retval > 0) {
// check for write notification and reset it
if (pollfds[1].revents & POLLIN) {
// FIXME (aw): check for failure
eventfd_read(this->event_fd, &eventfd_buffer);
}
}

if (retval == 0) {
// nothing to send or receive however, need to send the keep alive message
// otherwise the brocker will disconnect the client
// mqtt_sync might send this automatically but ocasionally might miss it
mqtt_ping(&this->mqtt_client);
}

// send and receive messages
MQTTErrors error = mqtt_sync(&this->mqtt_client);
if (error != MQTT_OK) {
EVLOG_error << fmt::format("Error during MQTT sync: {}", mqtt_error_str(error));
Expand All @@ -192,6 +228,8 @@ std::future<void> MQTTAbstractionImpl::spawn_main_loop_thread() {

return;
}
} else {
// probably we got hit by a signal, nothing to do, just reloop
}
}
} catch (boost::exception& e) {
Expand Down Expand Up @@ -377,6 +415,59 @@ void MQTTAbstractionImpl::unregister_handler(const std::string& topic, const Tok
EVLOG_debug << fmt::format("#handler[{}] = {}", topic, handler_count);
}

bool MQTTAbstractionImpl::connectBroker(const char* socket_path) {
BOOST_LOG_FUNCTION();

/* open the non-blocking TCP socket (connecting to the broker) */
mqtt_socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (mqtt_socket_fd == -1) {
EVLOG_error << fmt::format("Failed to open socket: {}", strerror(errno));
return false;
}

// Initialize the address structure
struct sockaddr_un addr;
memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1);

Check notice on line 432 in lib/mqtt_abstraction_impl.cpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/mqtt_abstraction_impl.cpp#L432

Easily used incorrectly; doesn't always \0-terminate or check for invalid pointers [MS-banned] (CWE-120).

// make non-blocking
auto retval = fcntl(mqtt_socket_fd, F_SETFL,
fcntl(mqtt_socket_fd, F_GETFL) | O_NONBLOCK); // NOLINT: We have no good alternative to fcntl
if (retval != 0) {
EVLOG_error << fmt::format("Failed to set nonblock for unix domain socket: {}", socket_path);
close(mqtt_socket_fd);
return false;
}

// conect the socket
if (::connect(mqtt_socket_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr_un)) == -1) {

Check notice on line 444 in lib/mqtt_abstraction_impl.cpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

lib/mqtt_abstraction_impl.cpp#L444

C-style pointer casting
EVLOG_error << fmt::format("Failed to connect to unix domain socket: {}", socket_path);
close(mqtt_socket_fd);
return false;
}

this->event_fd = eventfd(0, 0);
if (this->event_fd == -1) {
close(this->mqtt_socket_fd);
EVLOG_error << "Could not setup eventfd for mqttc io";
return false;
}

mqtt_init(&this->mqtt_client, mqtt_socket_fd, static_cast<uint8_t*>(this->sendbuf), sizeof(this->sendbuf),
static_cast<uint8_t*>(this->recvbuf), sizeof(this->recvbuf), MQTTAbstractionImpl::publish_callback);
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
if (mqtt_connect(&this->mqtt_client, nullptr, nullptr, nullptr, 0, nullptr, nullptr, connect_flags,
mqtt_keep_alive) == MQTT_OK) {
// TODO(kai): async?
on_mqtt_connect();
return true;
}

return false;
}

bool MQTTAbstractionImpl::connectBroker(const char* host, const char* port) {
BOOST_LOG_FUNCTION();

Expand Down
36 changes: 31 additions & 5 deletions lib/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,24 @@ RuntimeSettings::RuntimeSettings(const std::string& prefix_, const std::string&
controller_rpc_timeout_ms = defaults::CONTROLLER_RPC_TIMEOUT_MS;
}

// Unix Domain Socket configuration MUST be set in the configuration,
// doesn't have a default value if not provided thus it takes precedence
// over default values - this is to have backward compatiblity in term of configuration
// in case both UDS (Unix Domain Sockets) and IDS (Internet Domain Sockets) are set in config, raise exception
const auto settings_mqtt_broker_socket_path = settings.find("mqtt_borker_socket_path");
if (settings_mqtt_broker_socket_path != settings.end()) {
mqtt_broker_socket_path = settings_mqtt_broker_socket_path->get<std::string>();
}

const auto settings_mqtt_broker_host_it = settings.find("mqtt_broker_host");
if (settings_mqtt_broker_host_it != settings.end()) {
mqtt_broker_host = settings_mqtt_broker_host_it->get<std::string>();
if (!mqtt_broker_socket_path.empty()) {
// invalid configuration, can't have both UDS and IDS
throw BootException(
fmt::format("Setting both the Unix Domain Socket {} and Internet Domain Socket {} in config is invalid",
mqtt_broker_socket_path, mqtt_broker_host));
}
} else {
mqtt_broker_host = defaults::MQTT_BROKER_HOST;
}
Expand All @@ -277,6 +292,13 @@ RuntimeSettings::RuntimeSettings(const std::string& prefix_, const std::string&
const char* mqtt_server_address = std::getenv("MQTT_SERVER_ADDRESS");
if (mqtt_server_address != nullptr) {
mqtt_broker_host = mqtt_server_address;
if (!mqtt_broker_socket_path.empty()) {
// invalid configuration, can't have both UDS and IDS
throw BootException(
fmt::format("Setting both the Unix Domain Socket {} and Internet Domain Socket {} in "
"config and as environment variable respectivelly (as MQTT_SERVER_ADDRESS) is not allowed",
mqtt_broker_socket_path, mqtt_broker_host));
}
}

const auto settings_mqtt_broker_port_it = settings.find("mqtt_broker_port");
Expand Down Expand Up @@ -385,16 +407,20 @@ int ModuleLoader::initialize() {
}
Logging::update_process_name(module_identifier);

auto everest =
Everest(this->module_id, config, rs->validate_schema, rs->mqtt_broker_host, rs->mqtt_broker_port,
rs->mqtt_everest_prefix, rs->mqtt_external_prefix, rs->telemetry_prefix, rs->telemetry_enabled);
auto everest = Everest(this->module_id, config, rs->validate_schema, rs->mqtt_broker_socket_path,
rs->mqtt_broker_host, rs->mqtt_broker_port, rs->mqtt_everest_prefix,
rs->mqtt_external_prefix, rs->telemetry_prefix, rs->telemetry_enabled);

// module import
EVLOG_debug << fmt::format("Initializing module {}...", module_identifier);

if (!everest.connect()) {
EVLOG_error << fmt::format("Cannot connect to MQTT broker at {}:{}", rs->mqtt_broker_host,
rs->mqtt_broker_port);
if (rs->mqtt_broker_socket_path.empty()) {
EVLOG_error << fmt::format("Cannot connect to MQTT broker at {}:{}", rs->mqtt_broker_host,
rs->mqtt_broker_port);
} else {
EVLOG_error << fmt::format("Cannot connect to MQTT broker socket at {}", rs->mqtt_broker_socket_path);
}
return 1;
}

Expand Down
2 changes: 2 additions & 0 deletions schemas/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ properties:
type: integer
controller_rpc_timeout_ms:
type: integer
mqtt_borker_socket_path:
type: string
mqtt_broker_host:
type: string
mqtt_broker_port:
Expand Down
13 changes: 9 additions & 4 deletions src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,12 +588,17 @@ int boot(const po::variables_map& vm) {
// create StatusFifo object
auto status_fifo = StatusFifo::create_from_path(vm["status-fifo"].as<std::string>());

auto mqtt_abstraction = MQTTAbstraction(rs->mqtt_broker_host, std::to_string(rs->mqtt_broker_port),
rs->mqtt_everest_prefix, rs->mqtt_external_prefix);
auto mqtt_abstraction =
MQTTAbstraction(rs->mqtt_broker_socket_path, rs->mqtt_broker_host, std::to_string(rs->mqtt_broker_port),
rs->mqtt_everest_prefix, rs->mqtt_external_prefix);

if (!mqtt_abstraction.connect()) {
EVLOG_error << fmt::format("Cannot connect to MQTT broker at {}:{}", rs->mqtt_broker_host,
rs->mqtt_broker_port);
if (rs->mqtt_broker_socket_path.empty()) {
EVLOG_error << fmt::format("Cannot connect to MQTT broker at {}:{}", rs->mqtt_broker_host,
rs->mqtt_broker_port);
} else {
EVLOG_error << fmt::format("Cannot connect to MQTT broker socket at {}", rs->mqtt_broker_socket_path);
}
return EXIT_FAILURE;
}

Expand Down

0 comments on commit cb607f7

Please sign in to comment.