Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record all topics #30

Merged
merged 13 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions rosbag2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ if(BUILD_TESTING)
target_link_libraries(rosbag2_write_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_write_all_integration_test
test/rosbag2/rosbag2_write_all_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_write_all_integration_test)
target_link_libraries(rosbag2_write_all_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_read_integration_test
test/rosbag2/rosbag2_read_integration_test.cpp
test/rosbag2/test_memory_management.cpp
Expand All @@ -110,14 +118,6 @@ if(BUILD_TESTING)
)
endif()

ament_add_gmock(rosbag2_integration_test
test/rosbag2/rosbag2_integration_test.cpp
src/rosbag2/typesupport_helpers.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_integration_test)
target_link_libraries(rosbag2_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_rosbag_node_test
test/rosbag2/rosbag2_rosbag_node_test.cpp
test/rosbag2/test_memory_management.cpp
Expand All @@ -127,6 +127,11 @@ if(BUILD_TESTING)
src/rosbag2/typesupport_helpers.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_rosbag_node_test)
target_include_directories(rosbag2_rosbag_node_test
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(rosbag2_rosbag_node_test
ament_index_cpp
Poco
Expand Down
61 changes: 61 additions & 0 deletions rosbag2/include/rosbag2/logging.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2018, Bosch Software Innovations GmbH.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the need for this file? Is there something in rcutils which is missing?

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2__LOGGING_HPP_
#define ROSBAG2__LOGGING_HPP_

#include <sstream>
#include <string>

#include "rcutils/logging_macros.h"

#define ROSBAG2_PACKAGE_NAME "rosbag2"

#define ROSBAG2_LOG_INFO(...) \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_INFO_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_ERROR(...) \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_ERROR_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_WARN(...) \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_WARN_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_DEBUG(...) \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_DEBUG_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#endif // ROSBAG2__LOGGING_HPP_
22 changes: 17 additions & 5 deletions rosbag2/include/rosbag2/rosbag2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,31 @@ class Rosbag2Node;
class Rosbag2
{
public:
/**
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
* exist when "record" is called.
*
* \param file_name Name of the bagfile to write
* \param topic_names Vector of topics to subscribe to. Topics must exist at startup time. If
* the vector is empty, all topics will be subscribed.
* \param after_write_action This function will be executed after each write to the database
* where the input parameter is the topic name of the topic written Currently needed for testing.
* Might be removed later.
*/
ROSBAG2_PUBLIC
void record(
const std::string & file_name,
std::vector<std::string> topic_names,
const std::vector<std::string> & topic_names,
std::function<void(std::string)> after_write_action = nullptr);

/**
* Replay all topics in a bagfile.
*
* \param file_name Name of the bagfile to replay
*/
ROSBAG2_PUBLIC
void play(const std::string & file_name);

ROSBAG2_PUBLIC
std::map<std::string, std::string> get_topics_with_types(
const std::vector<std::string> & topic_names, std::shared_ptr<rclcpp::Node> node);

private:
void prepare_publishers(
std::shared_ptr<Rosbag2Node> node,
Expand Down
8 changes: 5 additions & 3 deletions rosbag2/src/rosbag2/demo_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
int main(int argc, const char ** argv)
{
if (argc < 2) {
std::cerr << "\nThe names of topics to record must be given as parameter!\n";
std::cerr << "\nThe names of topics or `--all` must be given to record as parameter!\n";
return 0;
}
std::vector<std::string> topics;

for (int i = 1; i < argc; i++) {
topics.emplace_back(argv[i]);
if (strcmp(argv[1], "--all") != 0) {
for (int i = 1; i < argc; i++) {
topics.emplace_back(argv[i]);
}
}

// TODO(anhosi): allow output file to be specified by cli argument and do proper checking if
Expand Down
7 changes: 4 additions & 3 deletions rosbag2/src/rosbag2/generic_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/subscription.hpp"

#include "rosbag2/logging.hpp"

namespace rosbag2
{

Expand Down Expand Up @@ -98,9 +100,8 @@ GenericSubscription::borrow_serialized_message(size_t capacity)
auto fini_return = rmw_serialized_message_fini(msg);
delete msg;
if (fini_return != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rosbag2",
"failed to destroy serialized message: %s", rcl_get_error_string_safe());
ROSBAG2_LOG_ERROR_STREAM(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is still a problem with the allocator IMO.
So assume we create a serialized message using this subscription, keep it in memory and destroy the subscription before destroying the message.
The message now has an invalid allocator.

Copy link
Contributor Author

@Martin-Idel-SI Martin-Idel-SI Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a close look and I believe it should be ok as is. The content of the allocator gets copied inside rmw_serialized_message_init(...) (see https://github.com/ros2/rcutils/blob/f04cc666dd06347050ff5c18b80079922e729d92/src/char_array.c#L50) so it should still be valid even after the pointer is destroyed (as long as its state is empty, of course, which seems to be the case for the default allocator, in the other case, even a delete won't help).

We did a check by prematurely destroying the allocator (create new rcutils_allocator_t, use it to initialize the serialized message and then immediately destroy it) and it ran perfectly and valgrind was also happy.

That said, I'd love to have a better way to do it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. Please ignore my comment then.

"Failed to destroy serialized message: " << rcl_get_error_string_safe());
}
});

Expand Down
8 changes: 4 additions & 4 deletions rosbag2/src/rosbag2/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class GenericSubscription : public rclcpp::SubscriptionBase
* Constructor. In order to properly subscribe to a topic, this subscription needs to be added to
* the node_topic_interface of the node passed into this constructor.
*
* @param node_handle Node handle to the node to create the subscription to
* @param ts Type support handle
* @param topic_name Topic name
* @param callback Callback for new messages of serialized form
* \param node_handle Node handle to the node to create the subscription to
* \param ts Type support handle
* \param topic_name Topic name
* \param callback Callback for new messages of serialized form
*/
GenericSubscription(
std::shared_ptr<rcl_node_t> node_handle,
Expand Down
47 changes: 9 additions & 38 deletions rosbag2/src/rosbag2/rosbag2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/rclcpp.hpp"
#include "rcutils/logging_macros.h"

#include "rosbag2/logging.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"
Expand All @@ -42,20 +43,21 @@ const char * ROS_PACKAGE_NAME = "rosbag2";

void Rosbag2::record(
const std::string & file_name,
std::vector<std::string> topic_names,
const std::vector<std::string> & topic_names,
std::function<void(std::string)> after_write_action)
{
rosbag2_storage::StorageFactory factory;
auto storage = factory.open_read_write(file_name, "sqlite3");

if (!storage) {
throw std::runtime_error("No storage could be initialized. Abort");
return;
}

auto node = std::make_shared<Rosbag2Node>("rosbag2");

auto topics_and_types = get_topics_with_types(topic_names, node);
auto topics_and_types = topic_names.empty() ?
node->get_all_topics_with_types() :
node->get_topics_with_types(topic_names);

if (topics_and_types.empty()) {
throw std::runtime_error("No topics found. Abort");
Expand All @@ -77,45 +79,15 @@ void Rosbag2::record(

if (subscriptions_.empty()) {
throw std::runtime_error("No topics could be subscribed. Abort");
return;
}

RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Waiting for messages...");
ROSBAG2_LOG_INFO("Waiting for messages...");
while (rclcpp::ok()) {
rclcpp::spin(node);
}
subscriptions_.clear();
}

std::map<std::string, std::string> Rosbag2::get_topics_with_types(
const std::vector<std::string> & topic_names, std::shared_ptr<rclcpp::Node> node)
{
// TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic
// This should be replaced by an auto-discovery system in the future
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto topics = node->get_topic_names_and_types();

std::map<std::string, std::string> topic_names_and_types;
for (const auto & topic_name : topic_names) {
std::string complete_topic_name = topic_name;
if (topic_name[0] != '/') {
complete_topic_name = "/" + topic_name;
}
auto position = topics.find(complete_topic_name);
if (position != topics.end()) {
if (position->second.size() > 1) {
RCUTILS_LOG_ERROR_NAMED(
ROS_PACKAGE_NAME,
"Topic '%s' has several types associated. Only topics with one type are supported.",
position->first.c_str());
} else {
topic_names_and_types.insert({position->first, position->second[0]});
}
}
}
return topic_names_and_types;
}

std::shared_ptr<GenericSubscription>
Rosbag2::create_subscription(
const std::function<void(std::string)> & after_write_action,
Expand All @@ -133,9 +105,8 @@ Rosbag2::create_subscription(
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
ROS_PACKAGE_NAME,
"Error getting current time. Error: %s", rcutils_get_error_string_safe());
ROSBAG2_LOG_ERROR_STREAM(
"Error getting current time. Error:" << rcutils_get_error_string_safe());
}
bag_message->time_stamp = time_stamp;

Expand All @@ -161,7 +132,7 @@ void Rosbag2::play(const std::string & file_name)
// without the sleep_for() many messages are lost.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
publishers_[message->topic_name]->publish(message->serialized_data);
RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "published message");
ROSBAG2_LOG_INFO("Published message");
}
}
}
Expand Down
Loading