Skip to content

Commit

Permalink
Play next N messages (ros2#7)
Browse files Browse the repository at this point in the history
* Updates the play_next API to play N messages.

- Makes tests pass with the previous functionality (just one message).
- No change to the ROS interface.

* Changes PlayNext.srv interface definition and modifies the player accordingly.

* Adds tests for play_next(N).

* Update rosbag2_transport/src/rosbag2_transport/player.cpp

* Apply suggestions from code review

Co-authored-by: Geoffrey Biggs <[email protected]>

* Adds clarification string

Co-authored-by: Geoffrey Biggs <[email protected]>
  • Loading branch information
agalbachicar and gbiggs authored Dec 21, 2021
1 parent 8fd9d6f commit 5a41c51
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 16 deletions.
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/PlayNext.srv
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
uint64 num_messages 1 # backwards compatible, default value is 1
---
bool success # can only play-next while playback is paused
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ class Player : public rclcpp::Node
bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \param num_messages The number of messages to play from the queue. It is
/// nullopt by default which makes it just take one. When zero, it'll just
/// make the method return true.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if Player::play() has been started, player in pause mode and successfully
/// played next message, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
bool play_next();
bool play_next(const std::optional<uint64_t> num_messages = std::nullopt);

protected:
std::atomic<bool> playing_messages_from_queue_{false};
Expand Down
51 changes: 36 additions & 15 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ Player::Player(
"~/play_next",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces_backport::srv::PlayNext::Request>/* request */,
const std::shared_ptr<rosbag2_interfaces_backport::srv::PlayNext::Request> request,
const std::shared_ptr<rosbag2_interfaces_backport::srv::PlayNext::Response> response)
{
response->success = play_next();
response->success = play_next({request->num_messages});
});
srv_play_for_ = create_service<rosbag2_interfaces_backport::srv::PlayFor>(
"~/play_for",
Expand Down Expand Up @@ -257,7 +257,9 @@ void Player::do_play(
// This is a faulty condition. This method must be called exclusively with
// one or none of the attributes set, but not both.
if (duration.has_value() && timestamp.has_value()) {
RCLCPP_ERROR(this->get_logger(), "Failed to play. Both duration and 'until' timestamp are set.");
RCLCPP_ERROR(
this->get_logger(),
"Failed to play. Both duration and 'until' timestamp are set.");
return;
}

Expand Down Expand Up @@ -357,27 +359,46 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
return message_ptr;
}

bool Player::play_next()
bool Player::play_next(const std::optional<uint64_t> num_messages)
{
// Evaluates the escape condition in which no message is required to be published.
if (num_messages.has_value() && *num_messages == 0) {
return true;
}

// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> lk(skip_message_in_main_play_loop_mutex_);

if (!clock_->is_paused() || !is_in_play_) {
return false;
}

skip_message_in_main_play_loop_ = true;
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
bool next_message_published{false};
uint64_t message_countdown = num_messages.has_value() ? *num_messages : 1u;

while (message_countdown > 0) {
// Resets state for the new iteration.
next_message_published = false;
skip_message_in_main_play_loop_ = true;
// Grabs a message and tries to publish it. When there is no publisher with
// the message->topic_name, it just peeks the next one.
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
// There are no more messages of the built publishers to play.
if (!next_message_published) {
break;
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
// next_message_published becomes true when the message is effectively
// published (i.e. there is a matching publisher for that message topic).
message_countdown--;
}
return next_message_published;
}
Expand Down
107 changes: 107 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,69 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa
EXPECT_THAT(replayed_topic1, SizeIs(messages.size()));
}

TEST_F(RosBag2PlayTestFixture, play_next_n_messages_with_the_same_timestamp) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size());

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30)));

auto await_received_messages = sub_->spin_subscriptions();

player->pause();
ASSERT_TRUE(player->is_paused());

auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
player->wait_for_playback_to_start();

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({1u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(1u));

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({2u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(3u));

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({1u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
ASSERT_FALSE(player->play_next({1u}));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
await_received_messages.get();
}

TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;
Expand Down Expand Up @@ -357,3 +420,47 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
// All we care is that any messages arrived
EXPECT_THAT(replayed_topic2, SizeIs(Eq(3u)));
}

TEST_F(RosBag2PlayTestFixture, play_next_no_message_must_succeed) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 1000, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size());

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30)));

auto await_received_messages = sub_->spin_subscriptions();

player->pause();
ASSERT_TRUE(player->is_paused());

auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
player->wait_for_playback_to_start();

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({0u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(0u));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
await_received_messages.get();
}

0 comments on commit 5a41c51

Please sign in to comment.