Skip to content

Commit

Permalink
Merge pull request #2461 from mavlink/pr-connection-subscription
Browse files Browse the repository at this point in the history
Add connection error subscription
  • Loading branch information
julianoes authored Dec 9, 2024
2 parents 62fa71a + 52a74fc commit bc0da1e
Show file tree
Hide file tree
Showing 16 changed files with 300 additions and 36 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_subdirectory(camera_settings)
add_subdirectory(camera_zoom)
add_subdirectory(component_metadata)
add_subdirectory(disconnect)
add_subdirectory(reconnect)
add_subdirectory(fly_mission)
add_subdirectory(fly_multiple_drones)
add_subdirectory(fly_qgc_mission)
Expand Down
22 changes: 22 additions & 0 deletions examples/reconnect/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
cmake_minimum_required(VERSION 3.10.2)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

project(reconnect)

add_executable(reconnect
reconnect.cpp
)

find_package(MAVSDK REQUIRED)

target_link_libraries(reconnect
MAVSDK::mavsdk
)

if(NOT MSVC)
add_compile_options(reconnect PRIVATE -Wall -Wextra)
else()
add_compile_options(reconnect PRIVATE -W2)
endif()
97 changes: 97 additions & 0 deletions examples/reconnect/reconnect.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// This is an example how to reconnect a connection on error.
//

#include <algorithm>
#include <atomic>
#include <cassert>
#include <chrono>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include <mavsdk/mavsdk.h>

using namespace mavsdk;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <one or more connection urls>\n"
<< "Connection URL format should be :\n"
<< " For TCP : tcp://[server_host][:server_port]\n"
<< " For UDP : udp://[bind_host][:bind_port]\n"
<< " For Serial : serial:///path/to/serial/dev[:baudrate]\n"
<< "For example, to connect to the simulator use URL: udpin://0.0.0.0:14540\n";
}

int main(int argc, char** argv)
{
if (argc < 2) {
usage(argv[0]);
return 1;
}

// Keep track of our connections
struct ConnectionEntry {
std::string url;
Mavsdk::ConnectionHandle handle;
bool connected;
};
std::mutex connections_mutex;
std::vector<ConnectionEntry> connections;

Mavsdk mavsdk{Mavsdk::Configuration{ComponentType::GroundStation}};

// Add all connections.
for (int i = 1; i < argc; ++i) {
ConnectionEntry entry{};
entry.url = argv[i];
entry.connected = false;
connections.push_back(entry);
}

// Subscribe to connection errors.
// When an error happens, remove the connection and mark it as disconnected.
mavsdk.subscribe_connection_errors([&](Mavsdk::ConnectionError connection_error) {
std::cout << "Connection error: " << connection_error.error_description << std::endl;

mavsdk.remove_connection(connection_error.connection_handle);

std::lock_guard lock(connections_mutex);
auto it = std::find_if(connections.begin(), connections.end(), [&](ConnectionEntry& entry) {
return entry.handle == connection_error.connection_handle;
});

assert(it != connections.end());

std::cout << "Removed connection: '" << it->url << "'" << std::endl;
it->connected = false;
});

while (true) {
{
// Got through connections and try to add them.
std::lock_guard lock(connections_mutex);
for (auto& entry : connections) {
if (!entry.connected) {
std::cout << "Try adding connection '" << entry.url << "'" << std::endl;
auto result = mavsdk.add_any_connection_with_handle(entry.url);

if (result.first != ConnectionResult::Success) {
std::cout << "Adding connection '" << entry.url
<< "'failed: " << result.first << std::endl;
} else {
entry.handle = result.second;
entry.connected = true;
}
}
}
}

std::this_thread::sleep_for(std::chrono::seconds(1));
}

return 0;
}
4 changes: 3 additions & 1 deletion src/mavsdk/core/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include "mavlink_receiver.h"
#include <atomic>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>

namespace mavsdk {

Expand All @@ -21,7 +23,7 @@ class Connection {
virtual ConnectionResult start() = 0;
virtual ConnectionResult stop() = 0;

virtual bool send_message(const mavlink_message_t& message) = 0;
virtual std::pair<bool, std::string> send_message(const mavlink_message_t& message) = 0;

bool has_system_id(uint8_t system_id);
bool should_forward_messages() const;
Expand Down
38 changes: 38 additions & 0 deletions src/mavsdk/core/include/mavsdk/mavsdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,44 @@ class Mavsdk {
*/
void remove_connection(ConnectionHandle handle);

/**
* ConnectionError type
*/
struct ConnectionError {
std::string error_description;
ConnectionHandle connection_handle;
};

/**
* Connection Error callback type
*/
using ConnectionErrorCallback = std::function<void(ConnectionError)>;

/**
* @brief Handle type to remove a connection error subscription.
*/
using ConnectionErrorHandle = Handle<ConnectionError>;

/**
* Subscribe to connection errors.
*
* This will trigger when messages fail to be sent which can help
* diagnosing network interfaces or serial devices disappearing.
*
* Usually, an error will require to remove a connection and add it fresh.
*
* @param callback Callback to subscribe.
* @return Handle to unsubscribe again.
*/
ConnectionErrorHandle subscribe_connection_errors(ConnectionErrorCallback callback);

/**
* Unsubscribe from connection errors.
*
* @param handle Handle to unsubscribe.
*/
void unsubscribe_connection_errors(ConnectionErrorHandle handle);

/**
* @brief Get a vector of systems which have been discovered or set-up.
*
Expand Down
11 changes: 11 additions & 0 deletions src/mavsdk/core/mavsdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,15 @@ void Mavsdk::intercept_outgoing_messages_async(std::function<bool(mavlink_messag
_impl->intercept_outgoing_messages_async(callback);
}

Mavsdk::ConnectionErrorHandle
Mavsdk::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
{
return _impl->subscribe_connection_errors(callback);
}

void Mavsdk::unsubscribe_connection_errors(ConnectionErrorHandle handle)
{
return _impl->unsubscribe_connection_errors(handle);
}

} // namespace mavsdk
32 changes: 29 additions & 3 deletions src/mavsdk/core/mavsdk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <tcp_server_connection.h>

#include "connection.h"
#include "log.h"
#include "tcp_client_connection.h"
#include "tcp_server_connection.h"
#include "udp_connection.h"
Expand Down Expand Up @@ -305,8 +306,13 @@ void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connect
!entry.connection->should_forward_messages()) {
continue;
}
if ((*entry.connection).send_message(message)) {
auto result = (*entry.connection).send_message(message);
if (result.first) {
successful_emissions++;
} else {
_connections_errors_subscriptions.queue(
Mavsdk::ConnectionError{result.second, entry.handle},
[this](const auto& func) { call_user_callback(func); });
}
}
if (successful_emissions == 0) {
Expand Down Expand Up @@ -473,9 +479,13 @@ bool MavsdkImpl::send_message(mavlink_message_t& message)
if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
continue;
}

if ((*_connection.connection).send_message(message)) {
const auto result = (*_connection.connection).send_message(message);
if (result.first) {
successful_emissions++;
} else {
_connections_errors_subscriptions.queue(
Mavsdk::ConnectionError{result.second, _connection.handle},
[this](const auto& func) { call_user_callback(func); });
}
}

Expand Down Expand Up @@ -915,6 +925,22 @@ void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_me
_intercept_outgoing_messages_callback = callback;
}

Mavsdk::ConnectionErrorHandle
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
{
std::lock_guard lock(_connections_mutex);

const auto handle = _connections_errors_subscriptions.subscribe(callback);

return handle;
}

void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
{
std::lock_guard lock(_connections_mutex);
_connections_errors_subscriptions.unsubscribe(handle);
}

uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
{
// Checks whether connection knows target system ID by extracting target system if set.
Expand Down
5 changes: 5 additions & 0 deletions src/mavsdk/core/mavsdk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class MavsdkImpl {
void intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback);
void intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback);

Mavsdk::ConnectionErrorHandle
subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback);
void unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle);

std::shared_ptr<ServerComponent> server_component(unsigned instance = 0);

std::shared_ptr<ServerComponent>
Expand Down Expand Up @@ -128,6 +132,7 @@ class MavsdkImpl {
Handle<> handle;
};
std::vector<ConnectionEntry> _connections{};
CallbackList<Mavsdk::ConnectionError> _connections_errors_subscriptions{};

mutable std::recursive_mutex _systems_mutex{};
std::vector<std::pair<uint8_t, std::shared_ptr<System>>> _systems{};
Expand Down
29 changes: 22 additions & 7 deletions src/mavsdk/core/serial_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <utility>
#endif

#include <sstream>

namespace mavsdk {

#ifndef WINDOWS
Expand Down Expand Up @@ -249,16 +251,21 @@ ConnectionResult SerialConnection::stop()
return ConnectionResult::Success;
}

bool SerialConnection::send_message(const mavlink_message_t& message)
std::pair<bool, std::string> SerialConnection::send_message(const mavlink_message_t& message)
{
std::pair<bool, std::string> result;

if (_serial_node.empty()) {
LogErr() << "Dev Path unknown";
return false;
result.first = false;
result.second = "Dev Path unknown";
return result;
}

if (_baudrate == 0) {
LogErr() << "Baudrate unknown";
return false;
result.first = false;
result.second = "Baudrate unknown";
return result;
}

uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
Expand All @@ -275,11 +282,19 @@ bool SerialConnection::send_message(const mavlink_message_t& message)
#endif

if (send_len != buffer_len) {
LogErr() << "write failure: " << GET_ERROR();
return false;
result.first = false;
result.second = "Baudrate unknown";

std::stringstream ss;
ss << "write failure: " << GET_ERROR();
LogErr() << ss.str();
result.first = false;
result.second = ss.str();
return result;
}

return true;
result.first = true;
return result;
}

void SerialConnection::receive()
Expand Down
2 changes: 1 addition & 1 deletion src/mavsdk/core/serial_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SerialConnection : public Connection {
ConnectionResult stop() override;
~SerialConnection() override;

bool send_message(const mavlink_message_t& message) override;
std::pair<bool, std::string> send_message(const mavlink_message_t& message) override;

// Non-copyable
SerialConnection(const SerialConnection&) = delete;
Expand Down
Loading

0 comments on commit bc0da1e

Please sign in to comment.