Skip to content

Commit

Permalink
Fixed playing if unknown message types exist (#592)
Browse files Browse the repository at this point in the history
1. play a specific known message type even if some unknown types exist.
2. add a warning message while a message type library not exist.

Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored and Emerson Knapp committed Feb 17, 2021
1 parent 4c2e74c commit 1549af4
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 6 deletions.
30 changes: 25 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "player.hpp"

#include <algorithm>
#include <chrono>
#include <memory>
#include <queue>
Expand Down Expand Up @@ -183,7 +184,10 @@ void Player::play_messages_until_queue_empty(const PlayOptions & options)
start_time_ + std::chrono::duration_cast<std::chrono::nanoseconds>(
1.0 / rate * message.time_since_start));
if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
auto publisher_iter = publishers_.find(message.message->topic_name);
if (publisher_iter != publishers_.end()) {
publisher_iter->second->publish(message.message->serialized_data);
}
}
}
}
Expand All @@ -196,11 +200,27 @@ void Player::prepare_publishers(const PlayOptions & options)

auto topics = reader_->get_all_topics_and_types();
for (const auto & topic : topics) {
// filter topics to add publishers if necessary
auto & filter_topics = storage_filter.topics;
if (!filter_topics.empty()) {
auto iter = std::find(filter_topics.begin(), filter_topics.end(), topic.name);
if (iter == filter_topics.end()) {
continue;
}
}

auto topic_qos = publisher_qos_for_topic(topic, topic_qos_profile_overrides_);
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, topic_qos)));
try {
publishers_.insert(
std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(
topic.name, topic.type, topic_qos)));
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
ROSBAG2_TRANSPORT_LOG_WARN(
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn

while (num_read_ < messages_.size()) {
for (const auto & filter_topic : filter_.topics) {
if (!messages_[num_read_ + 1]->topic_name.compare(filter_topic)) {
if (!messages_[num_read_]->topic_name.compare(filter_topic)) {
return true;
}
}
Expand Down
175 changes: 175 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,75 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics)
ElementsAre(40.0f, 2.0f, 0.0f)))));
}

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_with_unknown_type)
{
auto primitive_message1 = get_messages_basic_types()[0];
primitive_message1->int32_value = 42;

auto complex_message1 = get_messages_arrays()[0];
complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}};
complex_message1->bool_values = {{true, false, true}};

auto unknown_message1 = get_messages_basic_types()[0];
unknown_message1->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""},
{"topic2", "test_msgs/Arrays", "", ""},
{"topic3", "unknown_msgs/UnknownType", "", ""},
};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{serialize_test_message("topic1", 500, primitive_message1),
serialize_test_message("topic1", 700, primitive_message1),
serialize_test_message("topic1", 900, primitive_message1),
serialize_test_message("topic2", 550, complex_message1),
serialize_test_message("topic2", 750, complex_message1),
serialize_test_message("topic2", 950, complex_message1),
serialize_test_message("topic3", 900, unknown_message1)};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);

auto await_received_messages = sub_->spin_subscriptions();

Rosbag2Transport rosbag2_transport(reader_, writer_, info_);
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));
EXPECT_THAT(
replayed_test_primitives,
Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, 42))));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
EXPECT_THAT(
replayed_test_arrays,
Each(
Pointee(
Field(
&test_msgs::msg::Arrays::bool_values,
ElementsAre(true, false, true)))));
EXPECT_THAT(
replayed_test_arrays,
Each(
Pointee(
Field(
&test_msgs::msg::Arrays::float32_values,
ElementsAre(40.0f, 2.0f, 0.0f)))));
}

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics)
{
auto primitive_message1 = get_messages_basic_types()[0];
Expand Down Expand Up @@ -202,6 +271,112 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics)
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
}

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_with_unknown_type)
{
auto primitive_message1 = get_messages_basic_types()[0];
primitive_message1->int32_value = 42;

auto complex_message1 = get_messages_arrays()[0];
complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}};
complex_message1->bool_values = {{true, false, true}};

auto unknown_message1 = get_messages_basic_types()[0];
unknown_message1->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""},
{"topic2", "test_msgs/Arrays", "", ""},
{"topic3", "unknown_msgs/UnknownType", "", ""},
};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{serialize_test_message("topic1", 500, primitive_message1),
serialize_test_message("topic1", 700, primitive_message1),
serialize_test_message("topic1", 900, primitive_message1),
serialize_test_message("topic2", 550, complex_message1),
serialize_test_message("topic2", 750, complex_message1),
serialize_test_message("topic2", 950, complex_message1),
serialize_test_message("topic3", 900, unknown_message1)};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.

sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);

auto await_received_messages = sub_->spin_subscriptions();

Rosbag2Transport rosbag2_transport(reader_, writer_, info_);
play_options_.topics_to_filter = {"topic2"};
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(0u)));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));

// Set new filter
auto prepared_mock_reader2 = std::make_unique<MockSequentialReader>();
prepared_mock_reader2->prepare(messages, topic_types);
reader_.reset();
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader2));

sub_.reset();
sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);

await_received_messages = sub_->spin_subscriptions();

rosbag2_transport = Rosbag2Transport(reader_, writer_, info_);
play_options_.topics_to_filter = {"topic1"};
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));

replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(0u)));

// Reset filter
auto prepared_mock_reader3 = std::make_unique<MockSequentialReader>();
prepared_mock_reader3->prepare(messages, topic_types);
reader_.reset();
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader3));

sub_.reset();
sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);

await_received_messages = sub_->spin_subscriptions();

rosbag2_transport = Rosbag2Transport(reader_, writer_, info_);
play_options_.topics_to_filter = {"topic1", "topic2"};
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));

replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
}

class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture
{
public:
Expand Down

0 comments on commit 1549af4

Please sign in to comment.