Skip to content

Commit

Permalink
plugins: add log streaming (#2280)
Browse files Browse the repository at this point in the history
* plugins: add log streaming
* core: wait long enough after IN_PROGRESS
* log_streaming: additional length checks
* log_streaming: use active flag to track state
* log_streaming: add missing include
* log_streaming: prevent double stopping
* log_streaming: write what we have anyway
  • Loading branch information
julianoes authored Apr 25, 2024
1 parent 7b9655e commit e61c341
Show file tree
Hide file tree
Showing 24 changed files with 5,499 additions and 6 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ add_subdirectory(geofence)
add_subdirectory(gimbal)
add_subdirectory(gimbal_device_tester)
add_subdirectory(logfile_download)
add_subdirectory(log_streaming)
add_subdirectory(manual_control)
add_subdirectory(mavshell)
add_subdirectory(multiple_drones)
Expand Down
22 changes: 22 additions & 0 deletions examples/log_streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
cmake_minimum_required(VERSION 3.10.2)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

project(log_streaming)

add_executable(log_streaming
log_streaming.cpp
)

find_package(MAVSDK REQUIRED)

target_link_libraries(log_streaming
MAVSDK::mavsdk
)

if(NOT MSVC)
add_compile_options(log_streaming PRIVATE -Wall -Wextra)
else()
add_compile_options(log_streaming PRIVATE -WX -W2)
endif()
130 changes: 130 additions & 0 deletions examples/log_streaming/log_streaming.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// Example how to stream ulog from PX4 with MAVSDK.
//

#include <mavsdk/mavsdk.h>
#include <mavsdk/base64.h>
#include <mavsdk/plugins/log_streaming/log_streaming.h>

#include <chrono>
#include <iostream>
#include <thread>
#include <fstream>
#include <iomanip>
#include <sstream>

using namespace mavsdk;
using std::chrono::seconds;
using std::this_thread::sleep_for;

void usage(const std::string& bin_name)
{
std::cerr << "Usage : " << bin_name << " <connection_url> [--rm]\n"
<< '\n'
<< "Connection URL format should be :\n"
<< " For TCP : tcp://[server_host][:server_port]\n"
<< " For UDP : udp://[bind_host][:bind_port]\n"
<< " For Serial : serial:///path/to/serial/dev[:baudrate]\n"
<< "For example, to connect to the simulator use URL: udp://:14540" << std::endl;
}

int main(int argc, char** argv)
{
if (argc > 2) {
usage(argv[0]);
return 1;
}

Mavsdk mavsdk{Mavsdk::Configuration{Mavsdk::ComponentType::GroundStation}};
ConnectionResult connection_result = mavsdk.add_any_connection(argv[1]);

if (connection_result != ConnectionResult::Success) {
std::cerr << "Connection failed: " << connection_result << std::endl;
return 1;
}

auto system = mavsdk.first_autopilot(3.0);
if (!system) {
std::cerr << "Timed out waiting for system\n";
return 1;
}

// Create file to log to.
// Get current time
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);

// Convert time_t to tm struct
struct tm buf;
localtime_r(&in_time_t, &buf);

// Create a stringstream to hold the filename
std::stringstream ss;

// Format the time into the stringstream
ss << std::put_time(&buf, "%Y-%m-%d_%H-%M-%S") << ".ulg";

// Convert stringstream to string to use as filename
std::string filename = ss.str();

// Open the file
std::ofstream file(filename, std::ios::binary);

if (!file.is_open()) {
std::cerr << "Could not open file with name '" << ss.str() << "'";
return 1;
}

// Instantiate plugins.
auto log_streaming = LogStreaming{system.value()};

size_t bytes_written = 0;
size_t bytes_written_since_last = 0;
auto last_time = std::chrono::steady_clock::now();

log_streaming.subscribe_log_streaming_raw([&](LogStreaming::LogStreamingRaw raw) {
const auto bytes = mavsdk::base64_decode(raw.data);
file.write(reinterpret_cast<const char*>(bytes.data()), bytes.size());
bytes_written += bytes.size();
bytes_written_since_last += bytes.size();

auto now = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = now - last_time;

if (elapsed.count() >= 1.0) {
// More than or equal to one second has passed
double seconds = elapsed.count();
double rate_kib_per_second = (bytes_written_since_last / 1024.0) / seconds;

std::cout << "Wrote " << std::setprecision(3) << bytes_written / 1024.0 / 1024.0
<< " MiB (" << rate_kib_per_second << " KiB/s)" << std::endl;

// Reset timer and counter
last_time = now;
bytes_written_since_last = 0;
}
});

auto result = log_streaming.start_log_streaming();
if (result != LogStreaming::Result::Success) {
std::cerr << "Log streaming start failed." << std::endl;
return 1;
}

for (unsigned i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}

result = log_streaming.stop_log_streaming();
if (result != LogStreaming::Result::Success) {
std::cerr << "Log streaming stop failed." << std::endl;
return 1;
}

// Give it time to wrap up all logging data
std::this_thread::sleep_for(std::chrono::seconds(3));

file.close();

return 0;
}
2 changes: 1 addition & 1 deletion proto
2 changes: 2 additions & 0 deletions src/mavsdk/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ configure_file(include/mavsdk/mavlink_include.h.in include/mavsdk/mavlink_includ

target_sources(mavsdk
PRIVATE
base64.cpp
call_every_handler.cpp
connection.cpp
connection_result.cpp
Expand Down Expand Up @@ -150,6 +151,7 @@ install(TARGETS mavsdk

install(FILES
include/mavsdk/autopilot.h
include/mavsdk/base64.h
include/mavsdk/connection_result.h
include/mavsdk/deprecated.h
include/mavsdk/handle.h
Expand Down
102 changes: 102 additions & 0 deletions src/mavsdk/core/base64.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include "base64.h"

namespace mavsdk {

static const std::string base64_chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";

static inline bool is_base64(uint8_t c)
{
return (isalnum(c) || (c == '+') || (c == '/'));
}

std::string base64_encode(std::vector<uint8_t>& raw)
{
uint8_t* buf = raw.data();
size_t bufLen = raw.size();

std::string ret;
int i = 0;
int j = 0;
uint8_t char_array_3[3];
uint8_t char_array_4[4];

while (bufLen--) {
char_array_3[i++] = *(buf++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;

for (i = 0; (i < 4); i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}

if (i) {
for (j = i; j < 3; j++)
char_array_3[j] = '\0';

char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;

for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];

while ((i++ < 3))
ret += '=';
}

return ret;
}

std::vector<uint8_t> base64_decode(std::string const& encoded_string)
{
int in_len = encoded_string.size();
int i = 0;
int j = 0;
int in_ = 0;
uint8_t char_array_4[4], char_array_3[3];
std::vector<uint8_t> ret;

while (in_len-- && (encoded_string[in_] != '=') && is_base64(encoded_string[in_])) {
char_array_4[i++] = encoded_string[in_];
in_++;
if (i == 4) {
for (i = 0; i < 4; i++)
char_array_4[i] = base64_chars.find(char_array_4[i]);

char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];

for (i = 0; (i < 3); i++)
ret.push_back(char_array_3[i]);
i = 0;
}
}

if (i) {
for (j = i; j < 4; j++)
char_array_4[j] = 0;

for (j = 0; j < 4; j++)
char_array_4[j] = base64_chars.find(char_array_4[j]);

char_array_3[0] = (char_array_4[0] << 2) + ((char_array_4[1] & 0x30) >> 4);
char_array_3[1] = ((char_array_4[1] & 0xf) << 4) + ((char_array_4[2] & 0x3c) >> 2);
char_array_3[2] = ((char_array_4[2] & 0x3) << 6) + char_array_4[3];

for (j = 0; (j < i - 1); j++)
ret.push_back(char_array_3[j]);
}

return ret;
}

} // namespace mavsdk
2 changes: 1 addition & 1 deletion src/mavsdk/core/callback_list_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ template<typename... Args> class CallbackListImpl {
{
std::lock_guard<std::mutex> remove_later_lock(_remove_later_mutex);

// We could probably just grab the lock here but it's safer not to
// We could probably just grab the lock here, but it's safer not to
// acquire both locks to avoid deadlocks.
if (_mutex.try_lock()) {
if (_remove_all_later) {
Expand Down
12 changes: 12 additions & 0 deletions src/mavsdk/core/include/mavsdk/base64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include <cstdint>
#include <vector>
#include <string>

// Taken from https://stackoverflow.com/a/13935718/8548472

namespace mavsdk {

std::string base64_encode(std::vector<uint8_t>& raw);
std::vector<uint8_t> base64_decode(const std::string& str);

} // namespace mavsdk
5 changes: 5 additions & 0 deletions src/mavsdk/core/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ namespace mavsdk {
static std::mutex callback_mutex_{};
static log::Callback callback_{nullptr};

std::ostream& operator<<(std::ostream& os, std::byte b)
{
return os << std::bitset<8>(std::to_integer<int>(b));
}

log::Callback& log::get_callback()
{
std::lock_guard<std::mutex> lock(callback_mutex_);
Expand Down
4 changes: 4 additions & 0 deletions src/mavsdk/core/log.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <bitset>
#include <cstddef>
#include <mutex>
#include <sstream>
#include "log_callback.h"
Expand Down Expand Up @@ -30,6 +32,8 @@ namespace mavsdk {

static std::mutex log_mutex_{};

std::ostream& operator<<(std::ostream& os, std::byte b);

enum class Color { Red, Green, Yellow, Blue, Gray, Reset };

void set_color(Color color);
Expand Down
6 changes: 2 additions & 4 deletions src/mavsdk/core/mavlink_command_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,13 @@ void MavlinkCommandSender::receive_command_ack(mavlink_message_t message)
}
// If we get a progress update, we can raise the timeout
// to something higher because we know the initial command
// has arrived. A possible timeout for this case is the initial
// timeout * the possible retries because this should match the
// case where there is no progress update, and we keep trying.
// has arrived.
_system_impl.unregister_timeout_handler(work->timeout_cookie);
_system_impl.register_timeout_handler(
[this, identification = work->identification] {
receive_timeout(identification);
},
work->retries_to_do * work->timeout_s,
3.0,
&work->timeout_cookie);

temp_result = {
Expand Down
15 changes: 15 additions & 0 deletions src/mavsdk/plugins/log_streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
target_sources(mavsdk
PRIVATE
log_streaming.cpp
log_streaming_impl.cpp
)

target_include_directories(mavsdk PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include/mavsdk>
)

install(FILES
include/plugins/log_streaming/log_streaming.h
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/mavsdk/plugins/log_streaming
)
Loading

0 comments on commit e61c341

Please sign in to comment.