From e5d92c5195b32485146cee1da334e1685617f2fb Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Fri, 6 Oct 2023 08:12:26 +1300 Subject: [PATCH] mavlink: fix MAVLink message forwarding This switches from the horribly intertwined ringbuffer implementation to the new VariableLengthRingbuffer implementation. By ditching the previous implementation, we fix MAVLink message forwarding, which didn't work reliably. The reason it didn't work is that multiple mavlink messages could be added but only one of them was sent out because the buffer didn't keep track of the messages properly and only read the first one. Signed-off-by: Julian Oes --- src/modules/mavlink/CMakeLists.txt | 3 +- src/modules/mavlink/mavlink_main.cpp | 184 ++++----------------------- src/modules/mavlink/mavlink_main.h | 32 +---- 3 files changed, 32 insertions(+), 187 deletions(-) diff --git a/src/modules/mavlink/CMakeLists.txt b/src/modules/mavlink/CMakeLists.txt index 45f7737e6e03..8f5f6231af30 100644 --- a/src/modules/mavlink/CMakeLists.txt +++ b/src/modules/mavlink/CMakeLists.txt @@ -1,6 +1,6 @@ ############################################################################ # -# Copyright (c) 2015-2021 PX4 Development Team. All rights reserved. +# Copyright (c) 2015-2023 PX4 Development Team. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -140,6 +140,7 @@ px4_add_module( mavlink_c timesync tunes + variable_length_ringbuffer version UNITY_BUILD ) diff --git a/src/modules/mavlink/mavlink_main.cpp b/src/modules/mavlink/mavlink_main.cpp index 40f6cbf8191b..36aec9ec1dbd 100644 --- a/src/modules/mavlink/mavlink_main.cpp +++ b/src/modules/mavlink/mavlink_main.cpp @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2012-2021 PX4 Development Team. All rights reserved. + * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -178,6 +178,7 @@ Mavlink::~Mavlink() perf_free(_loop_perf); perf_free(_loop_interval_perf); perf_free(_send_byte_error_perf); + perf_free(_forwarding_error_perf); } void @@ -1218,117 +1219,16 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate) } } -int -Mavlink::message_buffer_init(int size) -{ - _message_buffer.size = size; - _message_buffer.write_ptr = 0; - _message_buffer.read_ptr = 0; - _message_buffer.data = (char *)malloc(_message_buffer.size); - - int ret; - - if (_message_buffer.data == nullptr) { - ret = PX4_ERROR; - _message_buffer.size = 0; - - } else { - ret = OK; - } - - return ret; -} - -void -Mavlink::message_buffer_destroy() -{ - _message_buffer.size = 0; - _message_buffer.write_ptr = 0; - _message_buffer.read_ptr = 0; - free(_message_buffer.data); -} - -int -Mavlink::message_buffer_count() -{ - int n = _message_buffer.write_ptr - _message_buffer.read_ptr; - - if (n < 0) { - n += _message_buffer.size; - } - - return n; -} - -bool -Mavlink::message_buffer_write(const void *ptr, int size) -{ - // bytes available to write - int available = _message_buffer.read_ptr - _message_buffer.write_ptr - 1; - - if (available < 0) { - available += _message_buffer.size; - } - - if (size > available) { - // buffer overflow - return false; - } - - char *c = (char *) ptr; - int n = _message_buffer.size - _message_buffer.write_ptr; // bytes to end of the buffer - - if (n < size) { - // message goes over end of the buffer - memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), c, n); - _message_buffer.write_ptr = 0; - - } else { - n = 0; - } - - // now: n = bytes already written - int p = size - n; // number of bytes to write - memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), &(c[n]), p); - _message_buffer.write_ptr = (_message_buffer.write_ptr + p) % _message_buffer.size; - return true; -} - -int -Mavlink::message_buffer_get_ptr(void **ptr, bool *is_part) -{ - // bytes available to read - int available = _message_buffer.write_ptr - _message_buffer.read_ptr; - - if (available == 0) { - return 0; // buffer is empty - } - - int n = 0; - - if (available > 0) { - // read pointer is before write pointer, all available bytes can be read - n = available; - *is_part = false; - - } else { - // read pointer is after write pointer, read bytes from read_ptr to end of the buffer - n = _message_buffer.size - _message_buffer.read_ptr; - *is_part = _message_buffer.write_ptr > 0; - } - - *ptr = &(_message_buffer.data[_message_buffer.read_ptr]); - return n; -} - void Mavlink::pass_message(const mavlink_message_t *msg) { - /* size is 8 bytes plus variable payload */ + /* size is 12 bytes plus variable payload */ int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len; - pthread_mutex_lock(&_message_buffer_mutex); - message_buffer_write(msg, size); - pthread_mutex_unlock(&_message_buffer_mutex); + LockGuard lg{_message_buffer_mutex}; + + if (!_message_buffer.push_back(reinterpret_cast(msg), size)) { + perf_count(_forwarding_error_perf); + } } MavlinkShell * @@ -2211,7 +2111,7 @@ Mavlink::task_main(int argc, char *argv[]) return PX4_ERROR; } - /* initialize send mutex */ + pthread_mutex_init(&_message_buffer_mutex, nullptr); pthread_mutex_init(&_send_mutex, nullptr); pthread_mutex_init(&_radio_status_mutex, nullptr); @@ -2221,13 +2121,12 @@ Mavlink::task_main(int argc, char *argv[]) * make space for two messages plus off-by-one space as we use the empty element * marker ring buffer approach. */ - if (OK != message_buffer_init(2 * sizeof(mavlink_message_t) + 1)) { + LockGuard lg{_message_buffer_mutex}; + + if (!_message_buffer.allocate(2 * sizeof(mavlink_message_t) + 1)) { PX4_ERR("msg buf alloc fail"); return PX4_ERROR; } - - /* initialize message buffer mutex */ - pthread_mutex_init(&_message_buffer_mutex, nullptr); } /* Activate sending the data by default (for the IRIDIUM mode it will be disabled after the first round of packages is sent)*/ @@ -2569,50 +2468,21 @@ Mavlink::task_main(int argc, char *argv[]) _events.update(t); - /* pass messages from other UARTs */ + /* pass messages from other instances */ if (get_forwarding_on()) { - bool is_part; - uint8_t *read_ptr; - uint8_t *write_ptr; - - pthread_mutex_lock(&_message_buffer_mutex); - int available = message_buffer_get_ptr((void **)&read_ptr, &is_part); - pthread_mutex_unlock(&_message_buffer_mutex); - - if (available > 0) { - // Reconstruct message from buffer - - mavlink_message_t msg; - write_ptr = (uint8_t *)&msg; - - // Pull a single message from the buffer - size_t read_count = available; - - if (read_count > sizeof(mavlink_message_t)) { - read_count = sizeof(mavlink_message_t); - } - - memcpy(write_ptr, read_ptr, read_count); - - // We hold the mutex until after we complete the second part of the buffer. If we don't - // we may end up breaking the empty slot overflow detection semantics when we mark the - // possibly partial read below. - pthread_mutex_lock(&_message_buffer_mutex); - - message_buffer_mark_read(read_count); - - /* write second part of buffer if there is some */ - if (is_part && read_count < sizeof(mavlink_message_t)) { - write_ptr += read_count; - available = message_buffer_get_ptr((void **)&read_ptr, &is_part); - read_count = sizeof(mavlink_message_t) - read_count; - memcpy(write_ptr, read_ptr, read_count); - message_buffer_mark_read(available); - } - - pthread_mutex_unlock(&_message_buffer_mutex); + mavlink_message_t msg; + size_t available_bytes; + { + // We only send one message at a time, not to put too much strain on a + // link from forwarded messages. + LockGuard lg{_message_buffer_mutex}; + available_bytes = _message_buffer.pop_front(reinterpret_cast(&msg), sizeof(msg)); + // We need to make sure to release the lock here before sending the + // bytes out via IP or UART which could potentially take longer. + } + if (available_bytes > 0) { resend_message(&msg); } } @@ -2662,11 +2532,6 @@ Mavlink::task_main(int argc, char *argv[]) _socket_fd = -1; } - if (get_forwarding_on()) { - message_buffer_destroy(); - pthread_mutex_destroy(&_message_buffer_mutex); - } - if (_mavlink_ulog) { _mavlink_ulog->stop(); _mavlink_ulog = nullptr; @@ -2674,6 +2539,7 @@ Mavlink::task_main(int argc, char *argv[]) pthread_mutex_destroy(&_send_mutex); pthread_mutex_destroy(&_radio_status_mutex); + pthread_mutex_destroy(&_message_buffer_mutex); PX4_INFO("exiting channel %i", (int)_channel); diff --git a/src/modules/mavlink/mavlink_main.h b/src/modules/mavlink/mavlink_main.h index 85c0361de3ad..1ba71c6a2ba1 100644 --- a/src/modules/mavlink/mavlink_main.h +++ b/src/modules/mavlink/mavlink_main.h @@ -1,6 +1,6 @@ /**************************************************************************** * - * Copyright (c) 2012-2018 PX4 Development Team. All rights reserved. + * Copyright (c) 2012-2023 PX4 Development Team. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -60,6 +60,7 @@ #include #include +#include #include #include #include @@ -419,11 +420,6 @@ class Mavlink final : public ModuleParams bool get_wait_to_transmit() { return _wait_to_transmit; } bool should_transmit() { return (_transmitting_enabled && (!_wait_to_transmit || (_wait_to_transmit && _received_messages))); } - bool message_buffer_write(const void *ptr, int size); - - void lockMessageBufferMutex(void) { pthread_mutex_lock(&_message_buffer_mutex); } - void unlockMessageBufferMutex(void) { pthread_mutex_unlock(&_message_buffer_mutex); } - /** * Count transmitted bytes */ @@ -651,16 +647,9 @@ class Mavlink final : public ModuleParams ping_statistics_s _ping_stats {}; - struct mavlink_message_buffer { - int write_ptr; - int read_ptr; - int size; - char *data; - }; + pthread_mutex_t _message_buffer_mutex{}; + VariableLengthRingbuffer _message_buffer{}; - mavlink_message_buffer _message_buffer {}; - - pthread_mutex_t _message_buffer_mutex {}; pthread_mutex_t _send_mutex {}; pthread_mutex_t _radio_status_mutex {}; @@ -682,6 +671,7 @@ class Mavlink final : public ModuleParams perf_counter_t _loop_perf{perf_alloc(PC_ELAPSED, MODULE_NAME": tx run elapsed")}; /**< loop performance counter */ perf_counter_t _loop_interval_perf{perf_alloc(PC_INTERVAL, MODULE_NAME": tx run interval")}; /**< loop interval performance counter */ perf_counter_t _send_byte_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": send_bytes error")}; /**< send bytes error count */ + perf_counter_t _forwarding_error_perf{perf_alloc(PC_COUNT, MODULE_NAME": forwarding error")}; /**< forwarding messages error count */ void mavlink_update_parameters(); @@ -711,18 +701,6 @@ class Mavlink final : public ModuleParams */ int configure_streams_to_default(const char *configure_single_stream = nullptr); - int message_buffer_init(int size); - - void message_buffer_destroy(); - - int message_buffer_count(); - - int message_buffer_is_empty() const { return (_message_buffer.read_ptr == _message_buffer.write_ptr); } - - int message_buffer_get_ptr(void **ptr, bool *is_part); - - void message_buffer_mark_read(int n) { _message_buffer.read_ptr = (_message_buffer.read_ptr + n) % _message_buffer.size; } - void pass_message(const mavlink_message_t *msg); void publish_telemetry_status();