diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index f83ace48d..f6a400206 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -46,6 +46,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '-f', '--serialization-format', default='', help='rmw serialization format in which the messages are saved, defaults to the' ' rmw currently in use') + parser.add_argument( + '--no-discovery', action='store_true', + help='disables topic auto discovery during recording: only topics present at ' + 'startup will be recorded') + parser.add_argument( + '-p', '--polling-interval', default=100, + help='time in ms to wait between querying available topics for recording. It has no ' + 'effect if --no-discovery is enabled.' + ) def create_bag_directory(self, uri): try: @@ -69,12 +78,16 @@ def main(self, *, args): # noqa: D102 uri=uri, storage_id=args.storage, serialization_format=args.serialization_format, - all=True) + all=True, + no_discovery=args.no_discovery, + polling_interval=args.polling_interval) elif args.topics and len(args.topics) > 0: rosbag2_transport_py.record( uri=uri, storage_id=args.storage, serialization_format=args.serialization_format, + no_discovery=args.no_discovery, + polling_interval=args.polling_interval, topics=args.topics) else: self._subparser.print_help() diff --git a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp index 9c0c41aad..a75baf689 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp @@ -41,9 +41,6 @@ class PublisherManager auto publisher_node = std::make_shared("publisher" + std::to_string(counter++)); auto publisher = publisher_node->create_publisher(topic_name); - // We need to publish one message to set up the topic for discovery - publisher->publish(message); - publishers_.push_back([publisher, topic_name, message, expected_messages]( CountFunction count_stored_messages) { if (expected_messages != 0) { diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index d5d28293a..408d2773a 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -25,15 +25,16 @@ TEST_F(RecordFixture, record_end_to_end_test) { auto message = get_messages_primitives()[0]; message->string_value = "test"; size_t expected_test_messages = 3; - pub_man_.add_publisher("/test_topic", message, expected_test_messages); auto wrong_message = get_messages_primitives()[0]; wrong_message->string_value = "wrong_content"; - pub_man_.add_publisher("/wrong_topic", wrong_message); auto process_handle = start_execution("ros2 bag record --output " + bag_path_ + " /test_topic"); wait_for_db(); + pub_man_.add_publisher("/test_topic", message, expected_test_messages); + pub_man_.add_publisher("/wrong_topic", wrong_message); + rosbag2_storage_plugins::SqliteWrapper db(database_path_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY); pub_man_.run_publishers([this, &db](const std::string & topic_name) { diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index e49c0da20..4c4953940 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -106,6 +106,14 @@ if(BUILD_TESTING) ament_target_dependencies(test_record_all test_msgs rosbag2_test_common) endif() + ament_add_gmock(test_record_all_no_discovery + test/rosbag2_transport/test_record_all_no_discovery.cpp + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if(TARGET test_record_all_no_discovery) + target_link_libraries(test_record_all_no_discovery rosbag2_transport) + ament_target_dependencies(test_record_all_no_discovery test_msgs rosbag2_test_common) + endif() + ament_add_gmock(test_play test/rosbag2_transport/test_play.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index ebb4154bf..e2cc6b808 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -15,6 +15,7 @@ #ifndef ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_ #define ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_ +#include #include #include @@ -24,8 +25,10 @@ struct RecordOptions { public: bool all; + bool is_discovery_disabled; std::vector topics; std::string rmw_serialization_format; + std::chrono::milliseconds topic_polling_interval; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d747f05b6..b4a012159 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -14,9 +14,14 @@ #include "recorder.hpp" +#include +#include #include +#include #include +#include #include +#include #include "rosbag2/writer.hpp" #include "rosbag2_transport/logging.hpp" @@ -30,33 +35,84 @@ Recorder::Recorder(std::shared_ptr writer, std::shared_ptrget_all_topics_with_types() : - node_->get_topics_with_types(record_options.topics); + if (record_options.rmw_serialization_format.empty()) { + throw std::runtime_error("No serialization format specified!"); + } + serialization_format_ = record_options.rmw_serialization_format; + ROSBAG2_TRANSPORT_LOG_INFO("Listening for topics..."); + subscribe_topics(get_requested_or_available_topics(record_options.topics)); - if (topics_and_types.empty()) { - throw std::runtime_error("No topics found. Abort"); + std::future discovery_future; + if (!record_options.is_discovery_disabled) { + auto discovery = std::bind( + &Recorder::topics_discovery, this, + record_options.topic_polling_interval, record_options.topics); + discovery_future = std::async(std::launch::async, discovery); } - for (const auto & topic_and_type : topics_and_types) { - auto topic_name = topic_and_type.first; - auto topic_type = topic_and_type.second; + record_messages(); + + if (discovery_future.valid()) { + discovery_future.wait(); + } - auto subscription = create_subscription(topic_name, topic_type); - if (subscription) { - subscriptions_.push_back(subscription); - writer_->create_topic({topic_name, topic_type, record_options.rmw_serialization_format}); - ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic_name << "'"); + subscriptions_.clear(); +} + +void Recorder::topics_discovery( + std::chrono::milliseconds topic_polling_interval, + const std::vector & requested_topics) +{ + while (rclcpp::ok()) { + auto topics_to_subscribe = get_requested_or_available_topics(requested_topics); + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); + + if (!requested_topics.empty() && subscribed_topics_.size() == requested_topics.size()) { + ROSBAG2_TRANSPORT_LOG_INFO("All requested topics are subscribed. Stopping discovery..."); + return; } + std::this_thread::sleep_for(topic_polling_interval); } +} + +std::unordered_map +Recorder::get_requested_or_available_topics(const std::vector & requested_topics) +{ + return requested_topics.empty() ? + node_->get_all_topics_with_types() : + node_->get_topics_with_types(requested_topics); +} - if (subscriptions_.empty()) { - throw std::runtime_error("No topics could be subscribed. Abort"); +std::unordered_map +Recorder::get_missing_topics(const std::unordered_map & topics) +{ + std::unordered_map missing_topics; + for (const auto & i : topics) { + if (subscribed_topics_.find(i.first) == subscribed_topics_.end()) { + missing_topics.emplace(i.first, i.second); + } } + return missing_topics; +} - ROSBAG2_TRANSPORT_LOG_INFO("Subscription setup complete."); - rclcpp::spin(node_); - subscriptions_.clear(); +void Recorder::subscribe_topics( + const std::unordered_map & topics_and_types) +{ + for (const auto & topic_with_type : topics_and_types) { + subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_}); + } +} + +void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic) +{ + auto subscription = create_subscription(topic.name, topic.type); + if (subscription) { + subscribed_topics_.insert(topic.name); + subscriptions_.push_back(subscription); + writer_->create_topic(topic); + ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'"); + } } std::shared_ptr @@ -83,4 +139,9 @@ Recorder::create_subscription( return subscription; } +void Recorder::record_messages() const +{ + spin(node_); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index afba018b3..d6419ed02 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -15,10 +15,16 @@ #ifndef ROSBAG2_TRANSPORT__RECORDER_HPP_ #define ROSBAG2_TRANSPORT__RECORDER_HPP_ +#include #include #include +#include +#include +#include #include +#include "rosbag2/types.hpp" +#include "rosbag2/writer.hpp" #include "rosbag2_transport/record_options.hpp" namespace rosbag2 @@ -40,12 +46,31 @@ class Recorder void record(const RecordOptions & record_options); private: + void topics_discovery( + std::chrono::milliseconds topic_polling_interval, + const std::vector & requested_topics = {}); + + std::unordered_map + get_requested_or_available_topics(const std::vector & requested_topics); + + std::unordered_map + get_missing_topics(const std::unordered_map & topics); + + void subscribe_topics( + const std::unordered_map & topics_and_types); + + void subscribe_topic(const rosbag2::TopicMetadata & topic); + std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); + void record_messages() const; + std::shared_ptr writer_; std::shared_ptr node_; std::vector> subscriptions_; + std::unordered_set subscribed_topics_; + std::string serialization_format_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp index b9ee61bfb..e73add517 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp @@ -129,9 +129,6 @@ std::unordered_map Rosbag2Node::get_topics_with_types( } } - // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic - // This should be replaced by an auto-discovery system in the future - std::this_thread::sleep_for(std::chrono::milliseconds(100)); auto topics_and_types = this->get_topic_names_and_types(); std::map> filtered_topics_and_types; @@ -149,9 +146,6 @@ std::unordered_map Rosbag2Node::get_topics_with_types( std::unordered_map Rosbag2Node::get_all_topics_with_types() { - // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic - // This should be replaced by an auto-discovery system in the future - std::this_thread::sleep_for(std::chrono::milliseconds(100)); return filter_topics_with_more_than_one_type(this->get_topic_names_and_types()); } diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index 05c6b1ab2..b02f7f3a8 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -28,18 +29,29 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * rosbag2_transport::RecordOptions record_options{}; static const char * kwlist[] = { - "uri", "storage_id", "serialization_format", "all", "topics", nullptr}; + "uri", + "storage_id", + "serialization_format", + "all", + "no_discovery", + "polling_interval", + "topics", + nullptr}; char * uri = nullptr; char * storage_id = nullptr; char * serilization_format = nullptr; bool all = false; + bool no_discovery = false; + uint64_t polling_interval_ms = 100; PyObject * topics = nullptr; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bO", const_cast(kwlist), + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bbKO", const_cast(kwlist), &uri, &storage_id, &serilization_format, &all, + &no_discovery, + &polling_interval_ms, &topics)) { return nullptr; @@ -48,6 +60,8 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * storage_options.uri = std::string(uri); storage_options.storage_id = std::string(storage_id); record_options.all = all; + record_options.is_discovery_disabled = no_discovery; + record_options.topic_polling_interval = std::chrono::milliseconds(polling_interval_ms); if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 73f10893c..2bd311f43 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -37,12 +37,12 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; + start_recording({false, false, {string_topic, array_topic}, "rmw_format", 100ms}); + pub_man_.add_publisher( string_topic, string_message, 2); pub_man_.add_publisher( array_topic, array_message, 2); - - start_recording({false, {string_topic, array_topic}, "rmw_format"}); run_publishers(); stop_recording(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 743d4ef35..f98527423 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -14,14 +14,9 @@ #include -#include #include -#include -#include "rclcpp/rclcpp.hpp" #include "record_integration_fixture.hpp" -#include "rosbag2_transport/rosbag2_transport.hpp" -#include "rosbag2/types.hpp" #include "test_msgs/msg/primitives.hpp" #include "test_msgs/msg/static_array_primitives.hpp" #include "test_msgs/message_fixtures.hpp" @@ -40,7 +35,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are pub_man_.add_publisher(string_topic, string_message, 2); pub_man_.add_publisher(array_topic, array_message, 2); - start_recording({true, {}, ""}); + start_recording({true, false, {}, "rmw_format", 100ms}); run_publishers(); stop_recording(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp new file mode 100644 index 000000000..89fa478a5 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -0,0 +1,42 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "record_integration_fixture.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/message_fixtures.hpp" + +TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_announced_topics) +{ + auto string_message = get_messages_primitives()[0]; + string_message->string_value = "Hello World"; + + start_recording({true, true, {}, "rmw_format", 1ms}); + + std::this_thread::sleep_for(100ms); + auto publisher_node = std::make_shared("publisher_for_test"); + auto publisher = publisher_node->create_publisher("/string_topic"); + for (int i = 0; i < 5; ++i) { + std::this_thread::sleep_for(20ms); + publisher->publish(string_message); + } + stop_recording(); + + ASSERT_THAT(writer_->get_messages(), SizeIs(0)); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp index f11620590..b785afe57 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp @@ -79,6 +79,12 @@ class RosBag2NodeFixture : public Test return memory_management_.serialize_message(string_message); } + void sleep_to_allow_topics_discovery() + { + // This is a short sleep to allow the node some time to discover the topic + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + MemoryManagement memory_management_; std::shared_ptr node_; rclcpp::Node::SharedPtr publisher_node_; @@ -113,6 +119,7 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work) TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_empty_if_topic_does_not_exist) { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"/wrong_topic"}); ASSERT_THAT(topics_and_types, IsEmpty()); @@ -123,6 +130,7 @@ TEST_F(RosBag2NodeFixture, { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); @@ -134,6 +142,7 @@ TEST_F(RosBag2NodeFixture, { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"/string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); @@ -149,6 +158,7 @@ TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_only_specified_topics) create_publisher(second_topic); create_publisher(third_topic); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({first_topic, second_topic}); ASSERT_THAT(topics_and_types, SizeIs(2)); @@ -166,6 +176,7 @@ TEST_F(RosBag2NodeFixture, get_all_topics_with_types_returns_all_topics) create_publisher(second_topic); create_publisher(third_topic); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_all_topics_with_types(); ASSERT_THAT(topics_and_types, SizeIs(Ge(3u)));