Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto discovery of new topics #63

Merged
15 changes: 14 additions & 1 deletion ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'-f', '--serialization-format', default='',
help='rmw serialization format in which the messages are saved, defaults to the'
' rmw currently in use')
parser.add_argument(
'--no-discovery', action='store_true',
help='disables topic auto discovery during recording: only topics present at '
'startup will be recorded')
parser.add_argument(
'-p', '--polling-interval', default=100,
help='time in ms to wait between querying available topics for recording. It has no '
'effect if --no-discovery is enabled.'
)

def create_bag_directory(self, uri):
try:
Expand All @@ -69,12 +78,16 @@ def main(self, *, args): # noqa: D102
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
all=True)
all=True,
no_discovery=args.no_discovery,
polling_interval=args.polling_interval)
elif args.topics and len(args.topics) > 0:
rosbag2_transport_py.record(
uri=uri,
storage_id=args.storage,
serialization_format=args.serialization_format,
no_discovery=args.no_discovery,
polling_interval=args.polling_interval,
topics=args.topics)
else:
self._subparser.print_help()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class PublisherManager
auto publisher_node = std::make_shared<rclcpp::Node>("publisher" + std::to_string(counter++));
auto publisher = publisher_node->create_publisher<T>(topic_name);

// We need to publish one message to set up the topic for discovery
publisher->publish(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just legacy or why is this code removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With auto discovery we no longer need to advertise the topic before starting rosbag in the test.


publishers_.push_back([publisher, topic_name, message, expected_messages](
CountFunction count_stored_messages) {
if (expected_messages != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ TEST_F(RecordFixture, record_end_to_end_test) {
auto message = get_messages_primitives()[0];
message->string_value = "test";
size_t expected_test_messages = 3;
pub_man_.add_publisher("/test_topic", message, expected_test_messages);

auto wrong_message = get_messages_primitives()[0];
wrong_message->string_value = "wrong_content";
pub_man_.add_publisher("/wrong_topic", wrong_message);

auto process_handle = start_execution("ros2 bag record --output " + bag_path_ + " /test_topic");
wait_for_db();

pub_man_.add_publisher("/test_topic", message, expected_test_messages);
pub_man_.add_publisher("/wrong_topic", wrong_message);

rosbag2_storage_plugins::SqliteWrapper
db(database_path_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY);
pub_man_.run_publishers([this, &db](const std::string & topic_name) {
Expand Down
8 changes: 8 additions & 0 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ if(BUILD_TESTING)
ament_target_dependencies(test_record_all test_msgs rosbag2_test_common)
endif()

ament_add_gmock(test_record_all_no_discovery
test/rosbag2_transport/test_record_all_no_discovery.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET test_record_all_no_discovery)
target_link_libraries(test_record_all_no_discovery rosbag2_transport)
ament_target_dependencies(test_record_all_no_discovery test_msgs rosbag2_test_common)
endif()

ament_add_gmock(test_play
test/rosbag2_transport/test_play.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_
#define ROSBAG2_TRANSPORT__RECORD_OPTIONS_HPP_

#include <chrono>
#include <string>
#include <vector>

Expand All @@ -24,8 +25,10 @@ struct RecordOptions
{
public:
bool all;
bool is_discovery_disabled;
std::vector<std::string> topics;
std::string rmw_serialization_format;
std::chrono::milliseconds topic_polling_interval;
};

} // namespace rosbag2_transport
Expand Down
97 changes: 79 additions & 18 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

#include "recorder.hpp"

#include <algorithm>
#include <future>
#include <memory>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "rosbag2/writer.hpp"
#include "rosbag2_transport/logging.hpp"
Expand All @@ -30,33 +35,84 @@ Recorder::Recorder(std::shared_ptr<rosbag2::Writer> writer, std::shared_ptr<Rosb

void Recorder::record(const RecordOptions & record_options)
{
auto topics_and_types = record_options.all ?
node_->get_all_topics_with_types() :
node_->get_topics_with_types(record_options.topics);
if (record_options.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}
serialization_format_ = record_options.rmw_serialization_format;
ROSBAG2_TRANSPORT_LOG_INFO("Listening for topics...");
subscribe_topics(get_requested_or_available_topics(record_options.topics));

if (topics_and_types.empty()) {
throw std::runtime_error("No topics found. Abort");
std::future<void> discovery_future;
if (!record_options.is_discovery_disabled) {
auto discovery = std::bind(
&Recorder::topics_discovery, this,
record_options.topic_polling_interval, record_options.topics);
discovery_future = std::async(std::launch::async, discovery);
}

for (const auto & topic_and_type : topics_and_types) {
auto topic_name = topic_and_type.first;
auto topic_type = topic_and_type.second;
record_messages();

if (discovery_future.valid()) {
discovery_future.wait();
}

auto subscription = create_subscription(topic_name, topic_type);
if (subscription) {
subscriptions_.push_back(subscription);
writer_->create_topic({topic_name, topic_type, record_options.rmw_serialization_format});
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic_name << "'");
subscriptions_.clear();
}

void Recorder::topics_discovery(
std::chrono::milliseconds topic_polling_interval,
const std::vector<std::string> & requested_topics)
{
while (rclcpp::ok()) {
auto topics_to_subscribe = get_requested_or_available_topics(requested_topics);
auto missing_topics = get_missing_topics(topics_to_subscribe);
subscribe_topics(missing_topics);

if (!requested_topics.empty() && subscribed_topics_.size() == requested_topics.size()) {
ROSBAG2_TRANSPORT_LOG_INFO("All requested topics are subscribed. Stopping discovery...");
return;
}
std::this_thread::sleep_for(topic_polling_interval);
}
}

std::unordered_map<std::string, std::string>
Recorder::get_requested_or_available_topics(const std::vector<std::string> & requested_topics)
{
return requested_topics.empty() ?
node_->get_all_topics_with_types() :
node_->get_topics_with_types(requested_topics);
}

if (subscriptions_.empty()) {
throw std::runtime_error("No topics could be subscribed. Abort");
std::unordered_map<std::string, std::string>
Recorder::get_missing_topics(const std::unordered_map<std::string, std::string> & topics)
{
std::unordered_map<std::string, std::string> missing_topics;
for (const auto & i : topics) {
if (subscribed_topics_.find(i.first) == subscribed_topics_.end()) {
missing_topics.emplace(i.first, i.second);
}
}
return missing_topics;
}

ROSBAG2_TRANSPORT_LOG_INFO("Subscription setup complete.");
rclcpp::spin(node_);
subscriptions_.clear();
void Recorder::subscribe_topics(
const std::unordered_map<std::string, std::string> & topics_and_types)
{
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic({topic_with_type.first, topic_with_type.second, serialization_format_});
}
}

void Recorder::subscribe_topic(const rosbag2::TopicMetadata & topic)
{
auto subscription = create_subscription(topic.name, topic.type);
if (subscription) {
subscribed_topics_.insert(topic.name);
subscriptions_.push_back(subscription);
writer_->create_topic(topic);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Subscribed to topic '" << topic.name << "'");
}
}

std::shared_ptr<GenericSubscription>
Expand All @@ -83,4 +139,9 @@ Recorder::create_subscription(
return subscription;
}

void Recorder::record_messages() const
{
spin(node_);
}

} // namespace rosbag2_transport
25 changes: 25 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
#ifndef ROSBAG2_TRANSPORT__RECORDER_HPP_
#define ROSBAG2_TRANSPORT__RECORDER_HPP_

#include <future>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "rosbag2/types.hpp"
#include "rosbag2/writer.hpp"
#include "rosbag2_transport/record_options.hpp"

namespace rosbag2
Expand All @@ -40,12 +46,31 @@ class Recorder
void record(const RecordOptions & record_options);

private:
void topics_discovery(
std::chrono::milliseconds topic_polling_interval,
const std::vector<std::string> & requested_topics = {});

std::unordered_map<std::string, std::string>
get_requested_or_available_topics(const std::vector<std::string> & requested_topics);

std::unordered_map<std::string, std::string>
get_missing_topics(const std::unordered_map<std::string, std::string> & topics);

void subscribe_topics(
const std::unordered_map<std::string, std::string> & topics_and_types);

void subscribe_topic(const rosbag2::TopicMetadata & topic);

std::shared_ptr<GenericSubscription> create_subscription(
const std::string & topic_name, const std::string & topic_type);

void record_messages() const;

std::shared_ptr<rosbag2::Writer> writer_;
std::shared_ptr<Rosbag2Node> node_;
std::vector<std::shared_ptr<GenericSubscription>> subscriptions_;
std::unordered_set<std::string> subscribed_topics_;
std::string serialization_format_;
};

} // namespace rosbag2_transport
Expand Down
6 changes: 0 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,6 @@ std::unordered_map<std::string, std::string> Rosbag2Node::get_topics_with_types(
}
}

// TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic
// This should be replaced by an auto-discovery system in the future
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto topics_and_types = this->get_topic_names_and_types();

std::map<std::string, std::vector<std::string>> filtered_topics_and_types;
Expand All @@ -149,9 +146,6 @@ std::unordered_map<std::string, std::string> Rosbag2Node::get_topics_with_types(
std::unordered_map<std::string, std::string>
Rosbag2Node::get_all_topics_with_types()
{
// TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic
// This should be replaced by an auto-discovery system in the future
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return filter_topics_with_more_than_one_type(this->get_topic_names_and_types());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Python.h>
#include <chrono>
#include <string>
#include <vector>

Expand All @@ -28,18 +29,29 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
rosbag2_transport::RecordOptions record_options{};

static const char * kwlist[] = {
"uri", "storage_id", "serialization_format", "all", "topics", nullptr};
"uri",
"storage_id",
"serialization_format",
"all",
"no_discovery",
"polling_interval",
"topics",
nullptr};

char * uri = nullptr;
char * storage_id = nullptr;
char * serilization_format = nullptr;
bool all = false;
bool no_discovery = false;
uint64_t polling_interval_ms = 100;
PyObject * topics = nullptr;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bO", const_cast<char **>(kwlist),
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|bbKO", const_cast<char **>(kwlist),
&uri,
&storage_id,
&serilization_format,
&all,
&no_discovery,
&polling_interval_ms,
&topics))
{
return nullptr;
Expand All @@ -48,6 +60,8 @@ rosbag2_transport_record(PyObject * Py_UNUSED(self), PyObject * args, PyObject *
storage_options.uri = std::string(uri);
storage_options.storage_id = std::string(storage_id);
record_options.all = all;
record_options.is_discovery_disabled = no_discovery;
record_options.topic_polling_interval = std::chrono::milliseconds(polling_interval_ms);

if (topics) {
PyObject * topic_iterator = PyObject_GetIter(topics);
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
string_message->string_value = "Hello World";
std::string string_topic = "/string_topic";

start_recording({false, false, {string_topic, array_topic}, "rmw_format", 100ms});

pub_man_.add_publisher<test_msgs::msg::Primitives>(
string_topic, string_message, 2);
pub_man_.add_publisher<test_msgs::msg::StaticArrayPrimitives>(
array_topic, array_message, 2);

start_recording({false, {string_topic, array_topic}, "rmw_format"});
run_publishers();
stop_recording();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@

#include <gmock/gmock.h>

#include <memory>
#include <string>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "record_integration_fixture.hpp"
#include "rosbag2_transport/rosbag2_transport.hpp"
#include "rosbag2/types.hpp"
#include "test_msgs/msg/primitives.hpp"
#include "test_msgs/msg/static_array_primitives.hpp"
#include "test_msgs/message_fixtures.hpp"
Expand All @@ -40,7 +35,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are
pub_man_.add_publisher<test_msgs::msg::Primitives>(string_topic, string_message, 2);
pub_man_.add_publisher<test_msgs::msg::StaticArrayPrimitives>(array_topic, array_message, 2);

start_recording({true, {}, ""});
start_recording({true, false, {}, "rmw_format", 100ms});
run_publishers();
stop_recording();

Expand Down
Loading