From 40fe5ed049851e85370cdc94d4bbaa3bda07a3f8 Mon Sep 17 00:00:00 2001 From: Alessandro Bottero Date: Fri, 26 Oct 2018 13:57:40 +0200 Subject: [PATCH 01/12] GH-27 Implement discovery for new topics after startup - this allows new topics to be discovered also after startup. - as a consquence, ros2 bag record won't throw any exception anymore if the specified topics are not available at startup - each time the recorder subscribes to a new topic, this will be logged to console --- .../rosbag2_test_common/publisher_manager.hpp | 3 - .../test_rosbag2_record_end_to_end.cpp | 5 +- .../src/rosbag2_transport/recorder.cpp | 65 ++++++++++++------- .../src/rosbag2_transport/recorder.hpp | 3 + .../src/rosbag2_transport/rosbag2_node.cpp | 6 -- .../test/rosbag2_transport/test_record.cpp | 4 +- .../rosbag2_transport/test_rosbag2_node.cpp | 11 ++++ 7 files changed, 59 insertions(+), 38 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp index 9c0c41aad..a75baf689 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp @@ -41,9 +41,6 @@ class PublisherManager auto publisher_node = std::make_shared("publisher" + std::to_string(counter++)); auto publisher = publisher_node->create_publisher(topic_name); - // We need to publish one message to set up the topic for discovery - publisher->publish(message); - publishers_.push_back([publisher, topic_name, message, expected_messages]( CountFunction count_stored_messages) { if (expected_messages != 0) { diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index d5d28293a..408d2773a 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -25,15 +25,16 @@ TEST_F(RecordFixture, record_end_to_end_test) { auto message = get_messages_primitives()[0]; message->string_value = "test"; size_t expected_test_messages = 3; - pub_man_.add_publisher("/test_topic", message, expected_test_messages); auto wrong_message = get_messages_primitives()[0]; wrong_message->string_value = "wrong_content"; - pub_man_.add_publisher("/wrong_topic", wrong_message); auto process_handle = start_execution("ros2 bag record --output " + bag_path_ + " /test_topic"); wait_for_db(); + pub_man_.add_publisher("/test_topic", message, expected_test_messages); + pub_man_.add_publisher("/wrong_topic", wrong_message); + rosbag2_storage_plugins::SqliteWrapper db(database_path_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY); pub_man_.run_publishers([this, &db](const std::string & topic_name) { diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d747f05b6..4107503f2 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -14,9 +14,12 @@ #include "recorder.hpp" +#include +#include #include #include #include +#include #include "rosbag2/writer.hpp" #include "rosbag2_transport/logging.hpp" @@ -30,32 +33,10 @@ Recorder::Recorder(std::shared_ptr writer, std::shared_ptrget_all_topics_with_types() : - node_->get_topics_with_types(record_options.topics); - - if (topics_and_types.empty()) { - throw std::runtime_error("No topics found. Abort"); - } - - for (const auto & topic_and_type : topics_and_types) { - auto topic_name = topic_and_type.first; - auto topic_type = topic_and_type.second; - - auto subscription = create_subscription(topic_name, topic_type); - if (subscription) { - subscriptions_.push_back(subscription); - writer_->create_topic({topic_name, topic_type, record_options.rmw_serialization_format}); - ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic_name << "'"); - } - } - - if (subscriptions_.empty()) { - throw std::runtime_error("No topics could be subscribed. Abort"); - } - - ROSBAG2_TRANSPORT_LOG_INFO("Subscription setup complete."); + ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); + auto discovery_future = launch_topics_discovery(record_options.topics); rclcpp::spin(node_); + discovery_future.wait(); subscriptions_.clear(); } @@ -83,4 +64,38 @@ Recorder::create_subscription( return subscription; } +std::future Recorder::launch_topics_discovery(std::vector topics) +{ + auto subscribe_to_topics = [this, topics] { + while (rclcpp::ok()) { + auto all_topics_and_types = topics.empty() ? + node_->get_all_topics_with_types() : + node_->get_topics_with_types(topics); + + for (const auto & topic_with_type : all_topics_and_types) { + std::string topic_name = topic_with_type.first; + bool already_subscribed = std::find( + subscribed_topics_.begin(), + subscribed_topics_.end(), + topic_with_type.first) != subscribed_topics_.end(); + + if (!already_subscribed) { + std::string topic_type = topic_with_type.second; + auto subscription = create_subscription(topic_name, topic_type); + if (subscription) { + subscribed_topics_.push_back(topic_name); + subscriptions_.push_back(subscription); + writer_->create_topic({topic_name, topic_type}); + ROSBAG2_TRANSPORT_LOG_INFO_STREAM( + "Subscribed to topic '" << topic_name << "'"); + } + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + }; + + return std::async(std::launch::async, subscribe_to_topics); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index afba018b3..3cfb8fe5d 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -15,6 +15,7 @@ #ifndef ROSBAG2_TRANSPORT__RECORDER_HPP_ #define ROSBAG2_TRANSPORT__RECORDER_HPP_ +#include #include #include #include @@ -42,10 +43,12 @@ class Recorder private: std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); + std::future launch_topics_discovery(std::vector topics = {}); std::shared_ptr writer_; std::shared_ptr node_; std::vector> subscriptions_; + std::vector subscribed_topics_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp index b9ee61bfb..e73add517 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp @@ -129,9 +129,6 @@ std::unordered_map Rosbag2Node::get_topics_with_types( } } - // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic - // This should be replaced by an auto-discovery system in the future - std::this_thread::sleep_for(std::chrono::milliseconds(100)); auto topics_and_types = this->get_topic_names_and_types(); std::map> filtered_topics_and_types; @@ -149,9 +146,6 @@ std::unordered_map Rosbag2Node::get_topics_with_types( std::unordered_map Rosbag2Node::get_all_topics_with_types() { - // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic - // This should be replaced by an auto-discovery system in the future - std::this_thread::sleep_for(std::chrono::milliseconds(100)); return filter_topics_with_more_than_one_type(this->get_topic_names_and_types()); } diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 73f10893c..515753399 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -37,12 +37,12 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; + start_recording({false, {string_topic, array_topic}}); + pub_man_.add_publisher( string_topic, string_message, 2); pub_man_.add_publisher( array_topic, array_message, 2); - - start_recording({false, {string_topic, array_topic}, "rmw_format"}); run_publishers(); stop_recording(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp index f11620590..b785afe57 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp @@ -79,6 +79,12 @@ class RosBag2NodeFixture : public Test return memory_management_.serialize_message(string_message); } + void sleep_to_allow_topics_discovery() + { + // This is a short sleep to allow the node some time to discover the topic + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + MemoryManagement memory_management_; std::shared_ptr node_; rclcpp::Node::SharedPtr publisher_node_; @@ -113,6 +119,7 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work) TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_empty_if_topic_does_not_exist) { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"/wrong_topic"}); ASSERT_THAT(topics_and_types, IsEmpty()); @@ -123,6 +130,7 @@ TEST_F(RosBag2NodeFixture, { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); @@ -134,6 +142,7 @@ TEST_F(RosBag2NodeFixture, { create_publisher("string_topic"); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({"/string_topic"}); ASSERT_THAT(topics_and_types, SizeIs(1)); @@ -149,6 +158,7 @@ TEST_F(RosBag2NodeFixture, get_topics_with_types_returns_only_specified_topics) create_publisher(second_topic); create_publisher(third_topic); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_topics_with_types({first_topic, second_topic}); ASSERT_THAT(topics_and_types, SizeIs(2)); @@ -166,6 +176,7 @@ TEST_F(RosBag2NodeFixture, get_all_topics_with_types_returns_all_topics) create_publisher(second_topic); create_publisher(third_topic); + sleep_to_allow_topics_discovery(); auto topics_and_types = node_->get_all_topics_with_types(); ASSERT_THAT(topics_and_types, SizeIs(Ge(3u))); From 71b40d298b68d2b60ee30ff5522b669e0981d891 Mon Sep 17 00:00:00 2001 From: Martin Idel Date: Mon, 26 Nov 2018 11:43:30 +0100 Subject: [PATCH 02/12] GH-27 Provide polling frequency as option - make option available in RecordOptions - set to 100ms for now --- .../include/rosbag2_transport/record_options.hpp | 2 ++ rosbag2_transport/src/rosbag2_transport/recorder.cpp | 10 ++++++---- rosbag2_transport/src/rosbag2_transport/recorder.hpp | 3 ++- .../src/rosbag2_transport/rosbag2_transport_python.cpp | 2 ++ .../test/rosbag2_transport/test_record.cpp | 2 +- .../test/rosbag2_transport/test_record_all.cpp | 2 +- 6 files changed, 14 insertions(+), 7 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index ebb4154bf..9d4a3d742 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -15,6 +15,7 @@ #ifndef ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_ #define ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_ +#include #include #include @@ -26,6 +27,7 @@ struct RecordOptions bool all; std::vector topics; std::string rmw_serialization_format; + std::chrono::milliseconds topic_polling_frequency; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 4107503f2..c9928d11e 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -34,7 +34,8 @@ Recorder::Recorder(std::shared_ptr writer, std::shared_ptr Recorder::launch_topics_discovery(std::vector topics) +std::future Recorder::launch_topics_discovery( + std::chrono::milliseconds topic_polling_frequency, std::vector topics) { - auto subscribe_to_topics = [this, topics] { + auto subscribe_to_topics = [this, topics, topic_polling_frequency] { while (rclcpp::ok()) { auto all_topics_and_types = topics.empty() ? node_->get_all_topics_with_types() : @@ -91,7 +93,7 @@ std::future Recorder::launch_topics_discovery(std::vector top } } } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(topic_polling_frequency); } }; diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 3cfb8fe5d..5c0a1ab3b 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -43,7 +43,8 @@ class Recorder private: std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); - std::future launch_topics_discovery(std::vector topics = {}); + std::future launch_topics_discovery( + std::chrono::milliseconds topic_polling_frequency, std::vector topics = {}); std::shared_ptr writer_; std::shared_ptr node_; diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index 05c6b1ab2..661113ad4 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include @@ -48,6 +49,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * storage_options.uri = std::string(uri); storage_options.storage_id = std::string(storage_id); record_options.all = all; + record_options.topic_polling_frequency = std::chrono::milliseconds(100); if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 515753399..cad03c8f3 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -37,7 +37,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; - start_recording({false, {string_topic, array_topic}}); + start_recording({false, {string_topic, array_topic}, 100ms}); pub_man_.add_publisher( string_topic, string_message, 2); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 743d4ef35..b9aaf7fa8 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -40,7 +40,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are pub_man_.add_publisher(string_topic, string_message, 2); pub_man_.add_publisher(array_topic, array_message, 2); - start_recording({true, {}, ""}); + start_recording({true, {}, "", 100ms}); run_publishers(); stop_recording(); From 05e02eaf7b9da0a878825f5c7d36ec430c13f419 Mon Sep 17 00:00:00 2001 From: Martin Idel Date: Mon, 26 Nov 2018 11:44:06 +0100 Subject: [PATCH 03/12] GH-27 Stop topic polling if subscription setup is complete --- rosbag2_transport/src/rosbag2_transport/recorder.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index c9928d11e..b3231d7bd 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -74,6 +74,10 @@ std::future Recorder::launch_topics_discovery( node_->get_all_topics_with_types() : node_->get_topics_with_types(topics); + if (!topics.empty() && subscriptions_.size() == topics.size()) { + return; + } + for (const auto & topic_with_type : all_topics_and_types) { std::string topic_name = topic_with_type.first; bool already_subscribed = std::find( From 4b9ef84cdec1eae9427a7f4fbcea71e63ef28120 Mon Sep 17 00:00:00 2001 From: Martin Idel Date: Mon, 26 Nov 2018 13:31:42 +0100 Subject: [PATCH 04/12] GH-27 Minor refactoring for readability --- .../rosbag2_transport/record_options.hpp | 2 +- .../src/rosbag2_transport/recorder.cpp | 77 ++++++++++++------- .../src/rosbag2_transport/recorder.hpp | 12 ++- .../rosbag2_transport_python.cpp | 2 +- 4 files changed, 63 insertions(+), 30 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index 9d4a3d742..aaaad6ce9 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -27,7 +27,7 @@ struct RecordOptions bool all; std::vector topics; std::string rmw_serialization_format; - std::chrono::milliseconds topic_polling_frequency; + std::chrono::milliseconds topic_polling_interval; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index b3231d7bd..f8791f893 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -35,8 +36,10 @@ void Recorder::record(const RecordOptions & record_options) { ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); auto discovery_future = launch_topics_discovery( - record_options.topic_polling_frequency, record_options.topics); - rclcpp::spin(node_); + record_options.topic_polling_interval, record_options.topics); + + record_messages(); + discovery_future.wait(); subscriptions_.clear(); } @@ -66,42 +69,62 @@ Recorder::create_subscription( } std::future Recorder::launch_topics_discovery( - std::chrono::milliseconds topic_polling_frequency, std::vector topics) + std::chrono::milliseconds topic_polling_interval, + const std::vector & topics_to_record) { - auto subscribe_to_topics = [this, topics, topic_polling_frequency] { + auto subscribe_to_topics = [this, topics_to_record, topic_polling_interval] { while (rclcpp::ok()) { - auto all_topics_and_types = topics.empty() ? + auto all_topics_and_types_to_subscribe = topics_to_record.empty() ? node_->get_all_topics_with_types() : - node_->get_topics_with_types(topics); + node_->get_topics_with_types(topics_to_record); - if (!topics.empty() && subscriptions_.size() == topics.size()) { + if (is_every_topic_subscribed(topics_to_record)) { return; } - for (const auto & topic_with_type : all_topics_and_types) { - std::string topic_name = topic_with_type.first; - bool already_subscribed = std::find( - subscribed_topics_.begin(), - subscribed_topics_.end(), - topic_with_type.first) != subscribed_topics_.end(); - - if (!already_subscribed) { - std::string topic_type = topic_with_type.second; - auto subscription = create_subscription(topic_name, topic_type); - if (subscription) { - subscribed_topics_.push_back(topic_name); - subscriptions_.push_back(subscription); - writer_->create_topic({topic_name, topic_type}); - ROSBAG2_TRANSPORT_LOG_INFO_STREAM( - "Subscribed to topic '" << topic_name << "'"); - } - } - } - std::this_thread::sleep_for(topic_polling_frequency); + subscribe_all_missing_topics(all_topics_and_types_to_subscribe); + std::this_thread::sleep_for(topic_polling_interval); } }; return std::async(std::launch::async, subscribe_to_topics); } +bool Recorder::is_every_topic_subscribed( + const std::vector & topics_to_record) const +{ + return !topics_to_record.empty() && subscriptions_.size() == topics_to_record.size(); +} + +void Recorder::subscribe_all_missing_topics( + const std::map & all_topics_and_types) +{ + for (const auto & topic_with_type : all_topics_and_types) { + bool already_subscribed = find( + subscribed_topics_.begin(), + subscribed_topics_.end(), + topic_with_type.first) != subscribed_topics_.end(); + + if (!already_subscribed) { + subscribe_topic({topic_with_type.first, topic_with_type.second}); + } + } +} + +void Recorder::subscribe_topic(const rosbag2::TopicWithType & topic) +{ + auto subscription = create_subscription(topic.name, topic.type); + if (subscription) { + subscribed_topics_.push_back(topic.name); + subscriptions_.push_back(subscription); + writer_->create_topic(topic); + ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'"); + } +} + +void Recorder::record_messages() const +{ + spin(node_); +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 5c0a1ab3b..d290dffd9 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -16,10 +16,14 @@ #define ROSBAG2_TRANSPORT__RECORDER_HPP_ #include +#include #include #include #include +#include +#include "rosbag2/types.hpp" +#include "rosbag2/writer.hpp" #include "rosbag2_transport/record_options.hpp" namespace rosbag2 @@ -44,7 +48,13 @@ class Recorder std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); std::future launch_topics_discovery( - std::chrono::milliseconds topic_polling_frequency, std::vector topics = {}); + std::chrono::milliseconds topic_polling_interval, + const std::vector & topics_to_record = {}); + void subscribe_all_missing_topics( + const std::map & all_topics_and_types); + void subscribe_topic(const rosbag2::TopicWithType & topic_with_type); + bool is_every_topic_subscribed(const std::vector & topics_to_record) const; + void record_messages() const; std::shared_ptr writer_; std::shared_ptr node_; diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index 661113ad4..577e4e6d1 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -49,7 +49,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * storage_options.uri = std::string(uri); storage_options.storage_id = std::string(storage_id); record_options.all = all; - record_options.topic_polling_frequency = std::chrono::milliseconds(100); + record_options.topic_polling_interval = std::chrono::milliseconds(100); if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); From 99121ee3fc3db4994ddeb714b855041d6a843f14 Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Thu, 29 Nov 2018 18:02:43 +0100 Subject: [PATCH 05/12] GH-27 Fix build after rebase --- .../src/rosbag2_transport/recorder.cpp | 13 +++++++++---- .../src/rosbag2_transport/recorder.hpp | 9 +++++---- .../test/rosbag2_transport/test_record.cpp | 2 +- .../test/rosbag2_transport/test_record_all.cpp | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index f8791f893..0c518bc1b 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -16,9 +16,10 @@ #include #include -#include #include +#include #include +#include #include #include @@ -34,7 +35,11 @@ Recorder::Recorder(std::shared_ptr writer, std::shared_ptr & all_topics_and_types) + const std::unordered_map & all_topics_and_types) { for (const auto & topic_with_type : all_topics_and_types) { bool already_subscribed = find( @@ -106,12 +111,12 @@ void Recorder::subscribe_all_missing_topics( topic_with_type.first) != subscribed_topics_.end(); if (!already_subscribed) { - subscribe_topic({topic_with_type.first, topic_with_type.second}); + subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_}); } } } -void Recorder::subscribe_topic(const rosbag2::TopicWithType & topic) +void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic) { auto subscription = create_subscription(topic.name, topic.type); if (subscription) { diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index d290dffd9..a34c390b9 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -16,11 +16,11 @@ #define ROSBAG2_TRANSPORT__RECORDER_HPP_ #include -#include #include #include -#include +#include #include +#include #include "rosbag2/types.hpp" #include "rosbag2/writer.hpp" @@ -51,8 +51,8 @@ class Recorder std::chrono::milliseconds topic_polling_interval, const std::vector & topics_to_record = {}); void subscribe_all_missing_topics( - const std::map & all_topics_and_types); - void subscribe_topic(const rosbag2::TopicWithType & topic_with_type); + const std::unordered_map & all_topics_and_types); + void subscribe_topic(const rosbag2::TopicMetadata & topic); bool is_every_topic_subscribed(const std::vector & topics_to_record) const; void record_messages() const; @@ -60,6 +60,7 @@ class Recorder std::shared_ptr node_; std::vector> subscriptions_; std::vector subscribed_topics_; + std::string serialization_format_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index cad03c8f3..4cc728beb 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -37,7 +37,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; - start_recording({false, {string_topic, array_topic}, 100ms}); + start_recording({false, {string_topic, array_topic}, "rmw_format", 100ms}); pub_man_.add_publisher( string_topic, string_message, 2); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index b9aaf7fa8..6df2eade5 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -40,7 +40,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are pub_man_.add_publisher(string_topic, string_message, 2); pub_man_.add_publisher(array_topic, array_message, 2); - start_recording({true, {}, "", 100ms}); + start_recording({true, {}, "rmw_format", 100ms}); run_publishers(); stop_recording(); From 9b2d103ea3fc7646bc7762f088059007bec8d12b Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Tue, 4 Dec 2018 13:53:29 +0100 Subject: [PATCH 06/12] GH-145 add no-discovery option to ros2 bag record --- ros2bag/ros2bag/verb/record.py | 7 +++- rosbag2_transport/CMakeLists.txt | 8 ++++ .../rosbag2_transport/record_options.hpp | 1 + .../src/rosbag2_transport/recorder.cpp | 12 ++++-- .../src/rosbag2_transport/recorder.hpp | 1 + .../rosbag2_transport_python.cpp | 7 +++- .../test/rosbag2_transport/test_record.cpp | 2 +- .../rosbag2_transport/test_record_all.cpp | 7 +--- .../test_record_all_no_discovery.cpp | 42 +++++++++++++++++++ 9 files changed, 73 insertions(+), 14 deletions(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index f83ace48d..290a41e1e 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -46,6 +46,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '-f', '--serialization-format', default='', help='rmw serialization format in which the messages are saved, defaults to the' ' rmw currently in use') + parser.add_argument( + '--no-discovery', action='store_true', + help='disables topic auto discovery during recording') def create_bag_directory(self, uri): try: @@ -69,12 +72,14 @@ def main(self, *, args): # noqa: D102 uri=uri, storage_id=args.storage, serialization_format=args.serialization_format, - all=True) + all=True, + no_discovery=args.no_discovery) elif args.topics and len(args.topics) > 0: rosbag2_transport_py.record( uri=uri, storage_id=args.storage, serialization_format=args.serialization_format, + no_discovery=args.no_discovery, topics=args.topics) else: self._subparser.print_help() diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index e49c0da20..4c4953940 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -106,6 +106,14 @@ if(BUILD_TESTING) ament_target_dependencies(test_record_all test_msgs rosbag2_test_common) endif() + ament_add_gmock(test_record_all_no_discovery + test/rosbag2_transport/test_record_all_no_discovery.cpp + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if(TARGET test_record_all_no_discovery) + target_link_libraries(test_record_all_no_discovery rosbag2_transport) + ament_target_dependencies(test_record_all_no_discovery test_msgs rosbag2_test_common) + endif() + ament_add_gmock(test_play test/rosbag2_transport/test_play.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index aaaad6ce9..e2cc6b808 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -25,6 +25,7 @@ struct RecordOptions { public: bool all; + bool is_discovery_disabled; std::vector topics; std::string rmw_serialization_format; std::chrono::milliseconds topic_polling_interval; diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 0c518bc1b..1983ce056 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -41,7 +41,9 @@ void Recorder::record(const RecordOptions & record_options) ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); serialization_format_ = record_options.rmw_serialization_format; auto discovery_future = launch_topics_discovery( - record_options.topic_polling_interval, record_options.topics); + record_options.topic_polling_interval, + record_options.is_discovery_disabled, + record_options.topics); record_messages(); @@ -75,10 +77,12 @@ Recorder::create_subscription( std::future Recorder::launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, + bool is_discovery_disabled, const std::vector & topics_to_record) { - auto subscribe_to_topics = [this, topics_to_record, topic_polling_interval] { - while (rclcpp::ok()) { + auto subscribe_to_topics = + [this, topics_to_record, topic_polling_interval, is_discovery_disabled] { + do { auto all_topics_and_types_to_subscribe = topics_to_record.empty() ? node_->get_all_topics_with_types() : node_->get_topics_with_types(topics_to_record); @@ -89,7 +93,7 @@ std::future Recorder::launch_topics_discovery( subscribe_all_missing_topics(all_topics_and_types_to_subscribe); std::this_thread::sleep_for(topic_polling_interval); - } + } while (rclcpp::ok() && !is_discovery_disabled); }; return std::async(std::launch::async, subscribe_to_topics); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index a34c390b9..f5c572445 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -49,6 +49,7 @@ class Recorder const std::string & topic_name, const std::string & topic_type); std::future launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, + bool is_discovery_disabled, const std::vector & topics_to_record = {}); void subscribe_all_missing_topics( const std::unordered_map & all_topics_and_types); diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index 577e4e6d1..d8976054c 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -29,18 +29,20 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * rosbag2_transport::RecordOptions record_options{}; static const char * kwlist[] = { - "uri", "storage_id", "serialization_format", "all", "topics", nullptr}; + "uri", "storage_id", "serialization_format", "all", "no_discovery", "topics", nullptr}; char * uri = nullptr; char * storage_id = nullptr; char * serilization_format = nullptr; bool all = false; + bool no_discovery = false; PyObject * topics = nullptr; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bO", const_cast(kwlist), + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bbO", const_cast(kwlist), &uri, &storage_id, &serilization_format, &all, + &no_discovery, &topics)) { return nullptr; @@ -49,6 +51,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * storage_options.uri = std::string(uri); storage_options.storage_id = std::string(storage_id); record_options.all = all; + record_options.is_discovery_disabled = no_discovery; record_options.topic_polling_interval = std::chrono::milliseconds(100); if (topics) { diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 4cc728beb..2bd311f43 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -37,7 +37,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are string_message->string_value = "Hello World"; std::string string_topic = "/string_topic"; - start_recording({false, {string_topic, array_topic}, "rmw_format", 100ms}); + start_recording({false, false, {string_topic, array_topic}, "rmw_format", 100ms}); pub_man_.add_publisher( string_topic, string_message, 2); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp index 6df2eade5..f98527423 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all.cpp @@ -14,14 +14,9 @@ #include -#include #include -#include -#include "rclcpp/rclcpp.hpp" #include "record_integration_fixture.hpp" -#include "rosbag2_transport/rosbag2_transport.hpp" -#include "rosbag2/types.hpp" #include "test_msgs/msg/primitives.hpp" #include "test_msgs/msg/static_array_primitives.hpp" #include "test_msgs/message_fixtures.hpp" @@ -40,7 +35,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are pub_man_.add_publisher(string_topic, string_message, 2); pub_man_.add_publisher(array_topic, array_message, 2); - start_recording({true, {}, "rmw_format", 100ms}); + start_recording({true, false, {}, "rmw_format", 100ms}); run_publishers(); stop_recording(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp new file mode 100644 index 000000000..89fa478a5 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_no_discovery.cpp @@ -0,0 +1,42 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "record_integration_fixture.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/message_fixtures.hpp" + +TEST_F(RecordIntegrationTestFixture, record_all_without_discovery_ignores_later_announced_topics) +{ + auto string_message = get_messages_primitives()[0]; + string_message->string_value = "Hello World"; + + start_recording({true, true, {}, "rmw_format", 1ms}); + + std::this_thread::sleep_for(100ms); + auto publisher_node = std::make_shared("publisher_for_test"); + auto publisher = publisher_node->create_publisher("/string_topic"); + for (int i = 0; i < 5; ++i) { + std::this_thread::sleep_for(20ms); + publisher->publish(string_message); + } + stop_recording(); + + ASSERT_THAT(writer_->get_messages(), SizeIs(0)); +} From 235604da75c63d395391c8991f56c29ea5a7537a Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Thu, 6 Dec 2018 14:40:18 +0100 Subject: [PATCH 07/12] GH-27 Refactor subscribing mechanism - use set for subscribed topics instead of vector - perform the asynchronous call only when discovery is needed --- .../src/rosbag2_transport/recorder.cpp | 53 +++++++++++-------- .../src/rosbag2_transport/recorder.hpp | 5 +- 2 files changed, 34 insertions(+), 24 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 1983ce056..c9653050f 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -38,16 +38,20 @@ void Recorder::record(const RecordOptions & record_options) if (record_options.rmw_serialization_format.empty()) { throw std::runtime_error("No serialization format specified!"); } - ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); serialization_format_ = record_options.rmw_serialization_format; - auto discovery_future = launch_topics_discovery( - record_options.topic_polling_interval, - record_options.is_discovery_disabled, - record_options.topics); + ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); + subscribe_topics(record_options.topics); + std::future discovery_future; + if (!record_options.is_discovery_disabled) { + discovery_future = launch_topics_discovery( + record_options.topic_polling_interval, record_options.topics); + } record_messages(); - discovery_future.wait(); + if (!record_options.is_discovery_disabled) { + discovery_future.wait(); + } subscriptions_.clear(); } @@ -77,28 +81,35 @@ Recorder::create_subscription( std::future Recorder::launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, - bool is_discovery_disabled, const std::vector & topics_to_record) { auto subscribe_to_topics = - [this, topics_to_record, topic_polling_interval, is_discovery_disabled] { - do { - auto all_topics_and_types_to_subscribe = topics_to_record.empty() ? - node_->get_all_topics_with_types() : - node_->get_topics_with_types(topics_to_record); - - if (is_every_topic_subscribed(topics_to_record)) { + [this, topics_to_record, topic_polling_interval] { + while (rclcpp::ok()) { + if (subscribe_topics(topics_to_record)) { return; } - - subscribe_all_missing_topics(all_topics_and_types_to_subscribe); std::this_thread::sleep_for(topic_polling_interval); - } while (rclcpp::ok() && !is_discovery_disabled); + } }; return std::async(std::launch::async, subscribe_to_topics); } +bool Recorder::subscribe_topics(const std::vector & topics_to_record) +{ + auto all_topics_and_types_to_subscribe = topics_to_record.empty() ? + node_->get_all_topics_with_types() : + node_->get_topics_with_types(topics_to_record); + + if (is_every_topic_subscribed(topics_to_record)) { + return true; + } + + subscribe_all_missing_topics(all_topics_and_types_to_subscribe); + return false; +} + bool Recorder::is_every_topic_subscribed( const std::vector & topics_to_record) const { @@ -109,10 +120,8 @@ void Recorder::subscribe_all_missing_topics( const std::unordered_map & all_topics_and_types) { for (const auto & topic_with_type : all_topics_and_types) { - bool already_subscribed = find( - subscribed_topics_.begin(), - subscribed_topics_.end(), - topic_with_type.first) != subscribed_topics_.end(); + bool already_subscribed = + subscribed_topics_.find(topic_with_type.first) != subscribed_topics_.end(); if (!already_subscribed) { subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_}); @@ -124,7 +133,7 @@ void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic) { auto subscription = create_subscription(topic.name, topic.type); if (subscription) { - subscribed_topics_.push_back(topic.name); + subscribed_topics_.insert(topic.name); subscriptions_.push_back(subscription); writer_->create_topic(topic); ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'"); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index f5c572445..9fd737097 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -49,8 +50,8 @@ class Recorder const std::string & topic_name, const std::string & topic_type); std::future launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, - bool is_discovery_disabled, const std::vector & topics_to_record = {}); + bool subscribe_topics(const std::vector & topics_to_record); void subscribe_all_missing_topics( const std::unordered_map & all_topics_and_types); void subscribe_topic(const rosbag2::TopicMetadata & topic); @@ -60,7 +61,7 @@ class Recorder std::shared_ptr writer_; std::shared_ptr node_; std::vector> subscriptions_; - std::vector subscribed_topics_; + std::unordered_set subscribed_topics_; std::string serialization_format_; }; From 6cc6171c3c38369cd3135bdb8e04059cb574a662 Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Thu, 6 Dec 2018 15:06:31 +0100 Subject: [PATCH 08/12] GH-27 Expose polling-interval as cli option --- ros2bag/ros2bag/verb/record.py | 8 +++++++- .../rosbag2_transport_python.cpp | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 290a41e1e..0b5cf7e12 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -49,6 +49,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--no-discovery', action='store_true', help='disables topic auto discovery during recording') + parser.add_argument( + '-p', '--polling-interval', default=100, + help='time in ms to wait between querying available topics for recording' + ) def create_bag_directory(self, uri): try: @@ -73,13 +77,15 @@ def main(self, *, args): # noqa: D102 storage_id=args.storage, serialization_format=args.serialization_format, all=True, - no_discovery=args.no_discovery) + no_discovery=args.no_discovery, + polling_interval=args.polling_interval) elif args.topics and len(args.topics) > 0: rosbag2_transport_py.record( uri=uri, storage_id=args.storage, serialization_format=args.serialization_format, no_discovery=args.no_discovery, + polling_interval=args.polling_interval, topics=args.topics) else: self._subparser.print_help() diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp index d8976054c..b02f7f3a8 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp @@ -29,20 +29,29 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * rosbag2_transport::RecordOptions record_options{}; static const char * kwlist[] = { - "uri", "storage_id", "serialization_format", "all", "no_discovery", "topics", nullptr}; + "uri", + "storage_id", + "serialization_format", + "all", + "no_discovery", + "polling_interval", + "topics", + nullptr}; char * uri = nullptr; char * storage_id = nullptr; char * serilization_format = nullptr; bool all = false; bool no_discovery = false; + uint64_t polling_interval_ms = 100; PyObject * topics = nullptr; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bbO", const_cast(kwlist), + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bbKO", const_cast(kwlist), &uri, &storage_id, &serilization_format, &all, &no_discovery, + &polling_interval_ms, &topics)) { return nullptr; @@ -52,7 +61,7 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject * storage_options.storage_id = std::string(storage_id); record_options.all = all; record_options.is_discovery_disabled = no_discovery; - record_options.topic_polling_interval = std::chrono::milliseconds(100); + record_options.topic_polling_interval = std::chrono::milliseconds(polling_interval_ms); if (topics) { PyObject * topic_iterator = PyObject_GetIter(topics); From 4def41e95042439fba62a22afd69d3524eb9b98c Mon Sep 17 00:00:00 2001 From: Alessandro Bottero Date: Thu, 6 Dec 2018 16:15:49 +0100 Subject: [PATCH 09/12] GH-27 Minor refactoring --- ros2bag/ros2bag/verb/record.py | 6 ++- .../src/rosbag2_transport/recorder.cpp | 49 ++++++++++--------- .../src/rosbag2_transport/recorder.hpp | 4 +- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index 0b5cf7e12..f6a400206 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -48,10 +48,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102 ' rmw currently in use') parser.add_argument( '--no-discovery', action='store_true', - help='disables topic auto discovery during recording') + help='disables topic auto discovery during recording: only topics present at ' + 'startup will be recorded') parser.add_argument( '-p', '--polling-interval', default=100, - help='time in ms to wait between querying available topics for recording' + help='time in ms to wait between querying available topics for recording. It has no ' + 'effect if --no-discovery is enabled.' ) def create_bag_directory(self, uri): diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index c9653050f..d58f9e2f9 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -39,7 +39,7 @@ void Recorder::record(const RecordOptions & record_options) throw std::runtime_error("No serialization format specified!"); } serialization_format_ = record_options.rmw_serialization_format; - ROSBAG2_TRANSPORT_LOG_INFO("Setup complete. Listening for topics..."); + ROSBAG2_TRANSPORT_LOG_INFO("Listening for topics..."); subscribe_topics(record_options.topics); std::future discovery_future; if (!record_options.is_discovery_disabled) { @@ -55,29 +55,6 @@ void Recorder::record(const RecordOptions & record_options) subscriptions_.clear(); } -std::shared_ptr -Recorder::create_subscription( - const std::string & topic_name, const std::string & topic_type) -{ - auto subscription = node_->create_generic_subscription( - topic_name, - topic_type, - [this, topic_name](std::shared_ptr message) { - auto bag_message = std::make_shared(); - bag_message->serialized_data = message; - bag_message->topic_name = topic_name; - rcutils_time_point_value_t time_stamp; - int error = rcutils_system_time_now(&time_stamp); - if (error != RCUTILS_RET_OK) { - ROSBAG2_TRANSPORT_LOG_ERROR_STREAM( - "Error getting current time. Error:" << rcutils_get_error_string().str); - } - bag_message->time_stamp = time_stamp; - - writer_->write(bag_message); - }); - return subscription; -} std::future Recorder::launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, @@ -140,6 +117,30 @@ void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic) } } +std::shared_ptr +Recorder::create_subscription( + const std::string & topic_name, const std::string & topic_type) +{ + auto subscription = node_->create_generic_subscription( + topic_name, + topic_type, + [this, topic_name](std::shared_ptr message) { + auto bag_message = std::make_shared(); + bag_message->serialized_data = message; + bag_message->topic_name = topic_name; + rcutils_time_point_value_t time_stamp; + int error = rcutils_system_time_now(&time_stamp); + if (error != RCUTILS_RET_OK) { + ROSBAG2_TRANSPORT_LOG_ERROR_STREAM( + "Error getting current time. Error:" << rcutils_get_error_string().str); + } + bag_message->time_stamp = time_stamp; + + writer_->write(bag_message); + }); + return subscription; +} + void Recorder::record_messages() const { spin(node_); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 9fd737097..938e858ab 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -46,12 +46,12 @@ class Recorder void record(const RecordOptions & record_options); private: - std::shared_ptr create_subscription( - const std::string & topic_name, const std::string & topic_type); std::future launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, const std::vector & topics_to_record = {}); bool subscribe_topics(const std::vector & topics_to_record); + std::shared_ptr create_subscription( + const std::string & topic_name, const std::string & topic_type); void subscribe_all_missing_topics( const std::unordered_map & all_topics_and_types); void subscribe_topic(const rosbag2::TopicMetadata & topic); From ee1125708680fbe0be43efb82776c3029ccaab9d Mon Sep 17 00:00:00 2001 From: Andreas Holzner Date: Thu, 6 Dec 2018 19:26:07 +0100 Subject: [PATCH 10/12] GH-27 Refactor recorder - naming of methods - emphasize similarity between first subscription and discovery loop --- .../src/rosbag2_transport/recorder.cpp | 60 +++++++++---------- .../src/rosbag2_transport/recorder.hpp | 14 +++-- 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d58f9e2f9..3c8ecdf46 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -40,7 +40,8 @@ void Recorder::record(const RecordOptions & record_options) } serialization_format_ = record_options.rmw_serialization_format; ROSBAG2_TRANSPORT_LOG_INFO("Listening for topics..."); - subscribe_topics(record_options.topics); + subscribe_topics(get_requested_or_available_topics(record_options.topics)); + std::future discovery_future; if (!record_options.is_discovery_disabled) { discovery_future = launch_topics_discovery( @@ -55,54 +56,53 @@ void Recorder::record(const RecordOptions & record_options) subscriptions_.clear(); } - std::future Recorder::launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, - const std::vector & topics_to_record) + const std::vector & requested_topics) { - auto subscribe_to_topics = - [this, topics_to_record, topic_polling_interval] { + auto discover_and_subscribe_topics = + [this, requested_topics, topic_polling_interval] { while (rclcpp::ok()) { - if (subscribe_topics(topics_to_record)) { + auto topics_to_subscribe = get_requested_or_available_topics(requested_topics); + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); + + if (!requested_topics.empty() && subscribed_topics_.size() == requested_topics.size()) { + ROSBAG2_TRANSPORT_LOG_INFO("All requested topics are subscribed. Stopping discovery..."); return; } std::this_thread::sleep_for(topic_polling_interval); } }; - return std::async(std::launch::async, subscribe_to_topics); + return std::async(std::launch::async, discover_and_subscribe_topics); } -bool Recorder::subscribe_topics(const std::vector & topics_to_record) +std::unordered_map +Recorder::get_requested_or_available_topics(const std::vector & requested_topics) { - auto all_topics_and_types_to_subscribe = topics_to_record.empty() ? - node_->get_all_topics_with_types() : - node_->get_topics_with_types(topics_to_record); - - if (is_every_topic_subscribed(topics_to_record)) { - return true; - } - - subscribe_all_missing_topics(all_topics_and_types_to_subscribe); - return false; + return requested_topics.empty() ? + node_->get_all_topics_with_types() : + node_->get_topics_with_types(requested_topics); } -bool Recorder::is_every_topic_subscribed( - const std::vector & topics_to_record) const +std::unordered_map +Recorder::get_missing_topics(const std::unordered_map & topics) { - return !topics_to_record.empty() && subscriptions_.size() == topics_to_record.size(); + std::unordered_map missing_topics; + for (const auto & i : topics) { + if (subscribed_topics_.find(i.first) == subscribed_topics_.end()) { + missing_topics.emplace(i.first, i.second); + } + } + return missing_topics; } -void Recorder::subscribe_all_missing_topics( - const std::unordered_map & all_topics_and_types) +void Recorder::subscribe_topics( + const std::unordered_map & topics_and_types) { - for (const auto & topic_with_type : all_topics_and_types) { - bool already_subscribed = - subscribed_topics_.find(topic_with_type.first) != subscribed_topics_.end(); - - if (!already_subscribed) { - subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_}); - } + for (const auto & topic_with_type : topics_and_types) { + subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_}); } } diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 938e858ab..3455eb9b5 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -48,14 +48,16 @@ class Recorder private: std::future launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, - const std::vector & topics_to_record = {}); - bool subscribe_topics(const std::vector & topics_to_record); + const std::vector & requested_topics = {}); + std::unordered_map + get_requested_or_available_topics(const std::vector & requested_topics); + std::unordered_map + get_missing_topics(const std::unordered_map & topics); + void subscribe_topics( + const std::unordered_map & topics_and_types); + void subscribe_topic(const rosbag2::TopicMetadata & topic); std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); - void subscribe_all_missing_topics( - const std::unordered_map & all_topics_and_types); - void subscribe_topic(const rosbag2::TopicMetadata & topic); - bool is_every_topic_subscribed(const std::vector & topics_to_record) const; void record_messages() const; std::shared_ptr writer_; From aa3976dc6ef091884001093cae04c7e7445b5f56 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Thu, 6 Dec 2018 12:54:30 -0800 Subject: [PATCH 11/12] small touch ups --- .../src/rosbag2_transport/recorder.cpp | 36 +++++++++---------- .../src/rosbag2_transport/recorder.hpp | 8 ++++- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 3c8ecdf46..ab07eacda 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -44,38 +44,36 @@ void Recorder::record(const RecordOptions & record_options) std::future discovery_future; if (!record_options.is_discovery_disabled) { - discovery_future = launch_topics_discovery( + auto launch_discovery = std::bind( + &Recorder::launch_topics_discovery, this, record_options.topic_polling_interval, record_options.topics); + discovery_future = std::async(std::launch::async, launch_discovery); } record_messages(); - if (!record_options.is_discovery_disabled) { + if (discovery_future.valid()) { discovery_future.wait(); } + subscriptions_.clear(); } -std::future Recorder::launch_topics_discovery( +void Recorder::launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, const std::vector & requested_topics) { - auto discover_and_subscribe_topics = - [this, requested_topics, topic_polling_interval] { - while (rclcpp::ok()) { - auto topics_to_subscribe = get_requested_or_available_topics(requested_topics); - auto missing_topics = get_missing_topics(topics_to_subscribe); - subscribe_topics(missing_topics); - - if (!requested_topics.empty() && subscribed_topics_.size() == requested_topics.size()) { - ROSBAG2_TRANSPORT_LOG_INFO("All requested topics are subscribed. Stopping discovery..."); - return; - } - std::this_thread::sleep_for(topic_polling_interval); - } - }; - - return std::async(std::launch::async, discover_and_subscribe_topics); + while (rclcpp::ok()) { + auto topics_to_subscribe = get_requested_or_available_topics(requested_topics); + auto missing_topics = get_missing_topics(topics_to_subscribe); + subscribe_topics(missing_topics); + + if (!requested_topics.empty() && subscribed_topics_.size() == requested_topics.size()) { + ROSBAG2_TRANSPORT_LOG_INFO("All requested topics are subscribed. Stopping discovery..."); + return; + } + std::this_thread::sleep_for(topic_polling_interval); + } } std::unordered_map diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 3455eb9b5..cb12a7596 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -46,18 +46,24 @@ class Recorder void record(const RecordOptions & record_options); private: - std::future launch_topics_discovery( + void launch_topics_discovery( std::chrono::milliseconds topic_polling_interval, const std::vector & requested_topics = {}); + std::unordered_map get_requested_or_available_topics(const std::vector & requested_topics); + std::unordered_map get_missing_topics(const std::unordered_map & topics); + void subscribe_topics( const std::unordered_map & topics_and_types); + void subscribe_topic(const rosbag2::TopicMetadata & topic); + std::shared_ptr create_subscription( const std::string & topic_name, const std::string & topic_type); + void record_messages() const; std::shared_ptr writer_; From 482ca0e5ad79be4e2aeb1b581fdfcaaccbb67da0 Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Thu, 6 Dec 2018 13:36:05 -0800 Subject: [PATCH 12/12] remove launch from function name --- rosbag2_transport/src/rosbag2_transport/recorder.cpp | 8 ++++---- rosbag2_transport/src/rosbag2_transport/recorder.hpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index ab07eacda..b4a012159 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -44,10 +44,10 @@ void Recorder::record(const RecordOptions & record_options) std::future discovery_future; if (!record_options.is_discovery_disabled) { - auto launch_discovery = std::bind( - &Recorder::launch_topics_discovery, this, + auto discovery = std::bind( + &Recorder::topics_discovery, this, record_options.topic_polling_interval, record_options.topics); - discovery_future = std::async(std::launch::async, launch_discovery); + discovery_future = std::async(std::launch::async, discovery); } record_messages(); @@ -59,7 +59,7 @@ void Recorder::record(const RecordOptions & record_options) subscriptions_.clear(); } -void Recorder::launch_topics_discovery( +void Recorder::topics_discovery( std::chrono::milliseconds topic_polling_interval, const std::vector & requested_topics) { diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index cb12a7596..d6419ed02 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -46,7 +46,7 @@ class Recorder void record(const RecordOptions & record_options); private: - void launch_topics_discovery( + void topics_discovery( std::chrono::milliseconds topic_polling_interval, const std::vector & requested_topics = {});