Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream quality changes from Apex.AI part 1 #1903

Merged
merged 11 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ class Arguments
std::for_each(
arguments_.begin(), arguments_.end(),
[this](const std::string & arg) {
pointers_.push_back(const_cast<char *>(arg.c_str()));
pointers_.push_back(arg.c_str());
}
);
pointers_.push_back(nullptr);
}

char ** argv()
const char ** argv()
{
return arguments_.empty() ? nullptr : pointers_.data();
}
Expand All @@ -65,7 +65,7 @@ class Arguments

private:
std::vector<std::string> arguments_;
std::vector<char *> pointers_;
std::vector<const char *> pointers_;
};

rclcpp::QoS qos_from_handle(const py::handle source)
Expand Down Expand Up @@ -283,7 +283,7 @@ class Player
player->play();

auto wait_for_exit_thread = std::thread(
[&]() {
[this, player]() {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();});
player->stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&std_string_msg, serialized_msg.get());

auto ret = std::make_shared<rcutils_uint8_array_t>();
*ret = serialized_msg->release_rcl_serialized_message();
return ret;
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg->release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}

std::shared_ptr<rosbag2_storage::storage_interfaces::ReadWriteInterface> write_messages_to_mcap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef ROSBAG2_TEST_COMMON__MEMORY_MANAGEMENT_HPP_
#define ROSBAG2_TEST_COMMON__MEMORY_MANAGEMENT_HPP_

#include <gtest/gtest.h>

#include <memory>
#include <string>

Expand All @@ -32,16 +34,18 @@ class MemoryManagement

template<typename T>
inline
std::shared_ptr<rmw_serialized_message_t>
serialize_message(std::shared_ptr<T> message)
std::shared_ptr<rcutils_uint8_array_t> serialize_message(std::shared_ptr<T> message)
{
rclcpp::Serialization<T> ser;
rclcpp::SerializedMessage serialized_message;
ser.serialize_message(message.get(), &serialized_message);
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();
ser.serialize_message(message.get(), serialized_msg.get());

auto ret = std::make_shared<rmw_serialized_message_t>();
*ret = serialized_message.release_rcl_serialized_message();
return ret;
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg->release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}

template<typename T>
Expand All @@ -58,10 +62,13 @@ class MemoryManagement

std::shared_ptr<rmw_serialized_message_t> make_initialized_message()
{
rclcpp::SerializedMessage serialized_message(0u);
auto ret = std::make_shared<rmw_serialized_message_t>();
*ret = serialized_message.release_rcl_serialized_message();
return ret;
rclcpp::SerializedMessage serialized_msg(0u);
auto msg = new rcutils_uint8_array_t;
*msg = serialized_msg.release_rcl_serialized_message();
return std::shared_ptr<rcutils_uint8_array_t>(msg, [](rmw_serialized_message_t * msg) {
EXPECT_EQ(rmw_serialized_message_fini(msg), RMW_RET_OK);
delete msg;
});
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class InfoEndToEndTestFixture : public Test, public WithParamInterface<std::stri
InfoEndToEndTestFixture()
{
// _SRC_RESOURCES_DIR_PATH defined in CMakeLists.txt
bags_path_ = (fs::path(_SRC_RESOURCES_DIR_PATH) / GetParam()).generic_string();
bags_path_ = fs::absolute(fs::path(_SRC_RESOURCES_DIR_PATH) / GetParam()).generic_string();
}

std::string bags_path_;
Expand Down
13 changes: 8 additions & 5 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
bool play();

/// \brief Waits on the condition variable until the play thread starts and the message's queue
/// will be filled.
/// @param timeout Maximum time in the fraction of seconds to wait for player to start.
/// If timeout is negative, the wait_for_playback_to_start will be a blocking call.
/// @return true if playback successfully started during timeout, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool wait_for_playback_to_start(std::chrono::duration<double> timeout = std::chrono::seconds(-1));

/// \brief Waits on the condition variable until the play thread finishes.
/// @param timeout Maximum time in the fraction of seconds to wait for player to finish.
/// If timeout is negative, the wait_for_playback_to_finish will be a blocking call.
Expand Down Expand Up @@ -336,11 +344,6 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr get_clock_publisher();

/// \brief Blocks and wait on condition variable until first message will be taken from read
/// queue
ROSBAG2_TRANSPORT_PUBLIC
void wait_for_playback_to_start();

/// \brief Getter for the number of registered on_play_msg_pre_callbacks
/// \return Number of registered on_play_msg_pre_callbacks
ROSBAG2_TRANSPORT_PUBLIC
Expand Down
77 changes: 50 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ class PlayerImpl
/// \return Shared pointer to the inner clock_publisher
rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr get_clock_publisher();

/// \brief Blocks and wait on condition variable until first message will be taken from read
/// queue
void wait_for_playback_to_start();
/// \brief Waits on the condition variable until first message will be taken from read queue
/// @param timeout Maximum time in the fraction of seconds to wait for player to start.
christophebedard marked this conversation as resolved.
Show resolved Hide resolved
/// If timeout is negative, the wait_for_playback_to_start will be a blocking call.
/// @return true if playback successfully started during timeout, otherwise false.
bool wait_for_playback_to_start(std::chrono::duration<double> timeout = std::chrono::seconds(-1));

/// \brief Waits on the condition variable until the play thread finishes.
/// @param timeout Maximum time in the fraction of seconds to wait for player to finish.
Expand Down Expand Up @@ -374,10 +376,10 @@ class PlayerImpl

std::shared_ptr<PlayerServiceClientManager> player_service_client_manager_;

BagMessageComparator get_bag_message_comparator(const MessageOrder & order);
static BagMessageComparator get_bag_message_comparator(const MessageOrder & order);

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by recv_timestamp.
struct
static inline const struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
Expand All @@ -388,7 +390,7 @@ class PlayerImpl
} bag_message_chronological_recv_timestamp_comparator;

/// Comparator for SerializedBagMessageSharedPtr to order chronologically by send_timestamp.
struct
static inline const struct
{
bool operator()(
const rosbag2_storage::SerializedBagMessageSharedPtr & l,
Expand Down Expand Up @@ -586,7 +588,7 @@ bool PlayerImpl::play()
ready_to_play_from_queue_cv_.notify_all();
}
} while (rclcpp::ok() && !stop_playback_ && play_options_.loop);
} catch (std::runtime_error & e) {
} catch (const std::exception & e) {
RCLCPP_ERROR(owner_->get_logger(), "Failed to play: %s", e.what());
load_storage_content_ = false;
if (storage_loading_future_.valid()) {storage_loading_future_.get();}
Expand Down Expand Up @@ -917,10 +919,17 @@ rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr PlayerImpl::get_clock_pu
return clock_publisher_;
}

void PlayerImpl::wait_for_playback_to_start()
bool PlayerImpl::wait_for_playback_to_start(std::chrono::duration<double> timeout)
{
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
if (timeout.count() < 0) {
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
return true;
} else {
return ready_to_play_from_queue_cv_.wait_for(
lk, timeout, [this] {return is_ready_to_play_from_queue_;}
);
}
}

size_t PlayerImpl::get_number_of_registered_on_play_msg_pre_callbacks()
Expand Down Expand Up @@ -1045,14 +1054,11 @@ void PlayerImpl::play_messages_from_queue()
// If we tried to publish because of play_next(), jump the clock
if (play_next_.load()) {
clock_->jump(get_message_order_timestamp(message_ptr));
// If we successfully played next, notify that we're done, otherwise keep trying
if (message_published) {
play_next_ = false;
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
finished_play_next_ = true;
play_next_result_ = true;
finished_play_next_cv_.notify_all();
}
play_next_ = false;
std::lock_guard<std::mutex> lk(finished_play_next_mutex_);
finished_play_next_ = true;
play_next_result_ = message_published;
finished_play_next_cv_.notify_all();
}
}
message_ptr = take_next_message_from_queue();
Expand Down Expand Up @@ -1357,20 +1363,37 @@ bool PlayerImpl::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr
{
auto pub_iter = publishers_.find(message->topic_name);
if (pub_iter != publishers_.end()) {
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);
bool message_published = false;
bool pre_callbacks_failed = true;
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
// Calling on play message pre-callbacks
run_play_msg_pre_callbacks(message);
christophebedard marked this conversation as resolved.
Show resolved Hide resolved
pre_callbacks_failed = false;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(
owner_->get_logger(), "Failed to publish message on '" << message->topic_name <<
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to call on play message pre-callback on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}

if (!pre_callbacks_failed) {
try {
pub_iter->second->publish(rclcpp::SerializedMessage(*message->serialized_data));
message_published = true;
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to publish message on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}
}

// Calling on play message post-callbacks
run_play_msg_post_callbacks(message);
try {
// Calling on play message post-callbacks
run_play_msg_post_callbacks(message);
} catch (const std::exception & e) {
RCLCPP_ERROR_STREAM(owner_->get_logger(),
"Failed to call on play message post-callback on '" << message->topic_name <<
"' topic. \nError: " << e.what());
}
return message_published;
}

Expand Down Expand Up @@ -1897,9 +1920,9 @@ rclcpp::Publisher<rosgraph_msgs::msg::Clock>::SharedPtr Player::get_clock_publis
return pimpl_->get_clock_publisher();
}

void Player::wait_for_playback_to_start()
bool Player::wait_for_playback_to_start(std::chrono::duration<double> timeout)
{
pimpl_->wait_for_playback_to_start();
return pimpl_->wait_for_playback_to_start(timeout);
}

size_t Player::get_number_of_registered_on_play_msg_pre_callbacks()
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -267,6 +268,7 @@ void RecorderImpl::record()
throw std::runtime_error("No serialization format specified!");
}

subscriptions_.clear();
christophebedard marked this conversation as resolved.
Show resolved Hide resolved
writer_->open(
storage_options_,
{rmw_get_serialization_format(), record_options_.rmw_serialization_format});
Expand Down Expand Up @@ -460,6 +462,8 @@ void RecorderImpl::stop_discovery()
"discovery_future_.wait_for(" << record_options_.topic_polling_interval.count() <<
") return status: " <<
(status == std::future_status::timeout ? "timeout" : "deferred"));
} else {
discovery_future_.get();
}
}
} else {
Expand Down
78 changes: 78 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,81 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
// All we care is that any messages arrived
EXPECT_THAT(replayed_topic2, SizeIs(Eq(3u)));
}

TEST_F(RosBag2PlayTestFixture, play_next_calls_pre_callback_only_once_if_fail) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
size_t callbacks_counter = 0;
{
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);
player->pause();
ASSERT_TRUE(player->is_paused());

player->play();
player->wait_for_playback_to_start();
ASSERT_TRUE(player->is_paused());
auto callback = [&callbacks_counter](std::shared_ptr<rosbag2_storage::SerializedBagMessage>) {
callbacks_counter++;
throw std::runtime_error("Throw exception from callback");
};
player->add_on_play_message_pre_callback(callback);
ASSERT_FALSE(player->play_next());
}
ASSERT_EQ(callbacks_counter, 1);
}

TEST_F(RosBag2PlayTestFixture, play_next_returns_false_if_pre_callback_throw_exception) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{1u, "topic1", "test_msgs/msg/BasicTypes", "", {}, ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);
player->pause();

ASSERT_TRUE(player->is_paused());
ASSERT_FALSE(player->play_next());

player->play();
player->wait_for_playback_to_start();
ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
auto callback_with_exception = [](std::shared_ptr<rosbag2_storage::SerializedBagMessage>) {
throw std::runtime_error("Throw exception from callback");
};
auto pre_callback_handle = player->add_on_play_message_pre_callback(callback_with_exception);
ASSERT_FALSE(player->play_next());
player->delete_on_play_message_callback(pre_callback_handle);

auto post_callback_handle = player->add_on_play_message_post_callback(callback_with_exception);
// play_next shall return true if post_callback fails, because message was published.
ASSERT_TRUE(player->play_next());
player->delete_on_play_message_callback(post_callback_handle);
}
Loading