From c3d6984cb8f1388de1cf3ff1e921032db9113e11 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Wed, 30 Dec 2020 07:54:25 +0800 Subject: [PATCH] Fixed playing if unknown message types exist (#592) 1. play a specific known message type even if some unknown types exist. 2. add a warning message while a message type library not exist. Signed-off-by: Chen Lihui Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/player.cpp | 30 ++- .../mock_sequential_reader.hpp | 2 +- .../test/rosbag2_transport/test_play.cpp | 175 ++++++++++++++++++ 3 files changed, 201 insertions(+), 6 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 4309e46053..8cf8c962c4 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -14,6 +14,7 @@ #include "player.hpp" +#include #include #include #include @@ -183,7 +184,10 @@ void Player::play_messages_until_queue_empty(const PlayOptions & options) start_time_ + std::chrono::duration_cast( 1.0 / rate * message.time_since_start)); if (rclcpp::ok()) { - publishers_[message.message->topic_name]->publish(message.message->serialized_data); + auto publisher_iter = publishers_.find(message.message->topic_name); + if (publisher_iter != publishers_.end()) { + publisher_iter->second->publish(message.message->serialized_data); + } } } } @@ -196,11 +200,27 @@ void Player::prepare_publishers(const PlayOptions & options) auto topics = reader_->get_all_topics_and_types(); for (const auto & topic : topics) { + // filter topics to add publishers if necessary + auto & filter_topics = storage_filter.topics; + if (!filter_topics.empty()) { + auto iter = std::find(filter_topics.begin(), filter_topics.end(), topic.name); + if (iter == filter_topics.end()) { + continue; + } + } + auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_); - publishers_.insert( - std::make_pair( - topic.name, rosbag2_transport_->create_generic_publisher( - topic.name, topic.type, topic_qos))); + try { + publishers_.insert( + std::make_pair( + topic.name, rosbag2_transport_->create_generic_publisher( + topic.name, topic.type, topic_qos))); + } catch (const std::runtime_error & e) { + // using a warning log seems better than adding a new option + // to ignore some unknown message type library + ROSBAG2_TRANSPORT_LOG_WARN( + "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); + } } } diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 9b1d81a9c6..bf738fa46c 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -44,7 +44,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn while (num_read_ < messages_.size()) { for (const auto & filter_topic : filter_.topics) { - if (!messages_[num_read_ + 1]->topic_name.compare(filter_topic)) { + if (!messages_[num_read_]->topic_name.compare(filter_topic)) { return true; } } diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 58e47a4e36..6b1652e8dc 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -106,6 +106,75 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) ElementsAre(40.0f, 2.0f, 0.0f))))); } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_with_unknown_type) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 42; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}}; + complex_message1->bool_values = {{true, false, true}}; + + auto unknown_message1 = get_messages_basic_types()[0]; + unknown_message1->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + {"topic2", "test_msgs/Arrays", "", ""}, + {"topic3", "unknown_msgs/UnknownType", "", ""}, + }; + + std::vector> messages = + {serialize_test_message("topic1", 500, primitive_message1), + serialize_test_message("topic1", 700, primitive_message1), + serialize_test_message("topic1", 900, primitive_message1), + serialize_test_message("topic2", 550, complex_message1), + serialize_test_message("topic2", 750, complex_message1), + serialize_test_message("topic2", 950, complex_message1), + serialize_test_message("topic3", 900, unknown_message1)}; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); + + // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber + // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. + sub_->add_subscription("/topic1", 2); + sub_->add_subscription("/topic2", 2); + + auto await_received_messages = sub_->spin_subscriptions(); + + Rosbag2Transport rosbag2_transport(reader_, writer_, info_); + rosbag2_transport.play(storage_options_, play_options_); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + EXPECT_THAT( + replayed_test_primitives, + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, 42)))); + + auto replayed_test_arrays = sub_->get_received_messages( + "/topic2"); + EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + EXPECT_THAT( + replayed_test_arrays, + Each( + Pointee( + Field( + &test_msgs::msg::Arrays::bool_values, + ElementsAre(true, false, true))))); + EXPECT_THAT( + replayed_test_arrays, + Each( + Pointee( + Field( + &test_msgs::msg::Arrays::float32_values, + ElementsAre(40.0f, 2.0f, 0.0f))))); +} + TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) { auto primitive_message1 = get_messages_basic_types()[0]; @@ -202,6 +271,112 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); } +TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_with_unknown_type) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 42; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}}; + complex_message1->bool_values = {{true, false, true}}; + + auto unknown_message1 = get_messages_basic_types()[0]; + unknown_message1->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + {"topic2", "test_msgs/Arrays", "", ""}, + {"topic3", "unknown_msgs/UnknownType", "", ""}, + }; + + std::vector> messages = + {serialize_test_message("topic1", 500, primitive_message1), + serialize_test_message("topic1", 700, primitive_message1), + serialize_test_message("topic1", 900, primitive_message1), + serialize_test_message("topic2", 550, complex_message1), + serialize_test_message("topic2", 750, complex_message1), + serialize_test_message("topic2", 950, complex_message1), + serialize_test_message("topic3", 900, unknown_message1)}; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); + + // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber + // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. + + sub_->add_subscription("/topic2", 2); + + auto await_received_messages = sub_->spin_subscriptions(); + + Rosbag2Transport rosbag2_transport(reader_, writer_, info_); + play_options_.topics_to_filter = {"topic2"}; + rosbag2_transport.play(storage_options_, play_options_); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(0u))); + + auto replayed_test_arrays = sub_->get_received_messages( + "/topic2"); + EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + + // Set new filter + auto prepared_mock_reader2 = std::make_unique(); + prepared_mock_reader2->prepare(messages, topic_types); + reader_.reset(); + reader_ = std::make_unique(std::move(prepared_mock_reader2)); + + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 2); + + await_received_messages = sub_->spin_subscriptions(); + + rosbag2_transport = Rosbag2Transport(reader_, writer_, info_); + play_options_.topics_to_filter = {"topic1"}; + rosbag2_transport.play(storage_options_, play_options_); + + await_received_messages.get(); + + replayed_test_primitives = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + + replayed_test_arrays = sub_->get_received_messages( + "/topic2"); + EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(0u))); + + // Reset filter + auto prepared_mock_reader3 = std::make_unique(); + prepared_mock_reader3->prepare(messages, topic_types); + reader_.reset(); + reader_ = std::make_unique(std::move(prepared_mock_reader3)); + + sub_.reset(); + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 2); + sub_->add_subscription("/topic2", 2); + + await_received_messages = sub_->spin_subscriptions(); + + rosbag2_transport = Rosbag2Transport(reader_, writer_, info_); + play_options_.topics_to_filter = {"topic1", "topic2"}; + rosbag2_transport.play(storage_options_, play_options_); + + await_received_messages.get(); + + replayed_test_primitives = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + + replayed_test_arrays = sub_->get_received_messages( + "/topic2"); + EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); +} + class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture { public: