Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ros2/Sink] Implement the 'enable-save-rosbag' feature #53

Merged
merged 1 commit into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions ros2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ ENDFOREACH()

# find dependencies
FIND_PACKAGE(ament_cmake REQUIRED)
FIND_PACKAGE(pluginlib REQUIRED)
FIND_PACKAGE(rcutils REQUIRED)
FIND_PACKAGE(rclcpp REQUIRED)
FIND_PACKAGE(std_msgs REQUIRED)

FIND_PACKAGE(rosbag2 REQUIRED)
FIND_PACKAGE(rosbag2_storage REQUIRED)
FIND_PACKAGE(sqlite3_vendor REQUIRED)
FIND_PACKAGE(SQLite3 REQUIRED) # provided by sqlite3_vendor
FIND_PACKAGE(rmw_fastrtps_cpp REQUIRED)
FIND_PACKAGE(rosbag2_storage_default_plugins REQUIRED)
FIND_PACKAGE(rosbag2_converter_default_plugins REQUIRED)
FIND_PACKAGE(rosidl_default_generators REQUIRED)

SET(NNS_ROS2_LIB_MSGS "${NNS_ROS_COMMON_DIR}/msg:Tensors.msg")
Expand All @@ -54,7 +62,10 @@ ROSIDL_GENERATE_INTERFACES(${PROJECT_NAME}
ADD_LIBRARY(${NNS_ROS2_LIB_TARGET} SHARED
${NNS_ROS2_LIB_SRCS}
)
AMENT_TARGET_DEPENDENCIES(${NNS_ROS2_LIB_TARGET} rclcpp std_msgs)
AMENT_TARGET_DEPENDENCIES(${NNS_ROS2_LIB_TARGET}
rclcpp std_msgs rosbag2 pluginlib rcutils rosbag2_storage_default_plugins
rosbag2_converter_default_plugins rosbag2_storage rmw_fastrtps_cpp
sqlite3_vendor SQLite3)
AMENT_EXPORT_DEPENDENCIES(rosidl_default_runtime)
ROSIDL_TARGET_INTERFACES(${NNS_ROS2_LIB_TARGET}
${PROJECT_NAME} "rosidl_typesupport_cpp"
Expand Down
185 changes: 176 additions & 9 deletions ros2/node/nns_rclcpp_publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,23 @@
* by implementing the NnsRosPublisher class.
*
*/
#include <glib.h>
#include <glib/gstdio.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>

#include <chrono>
#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rosbag2/writers/sequential_writer.hpp"
#include "rosbag2/storage_options.hpp"
#include "rosbag2/writer.hpp"
#include "std_msgs/msg/multi_array_layout.hpp"
#include "std_msgs/msg/multi_array_dimension.hpp"

#include "nns_ros_publisher.h"
#include "nns_ros2_bridge/msg/tensors.hpp"
#include "std_msgs/msg/multi_array_layout.hpp"
Expand All @@ -38,11 +50,15 @@ class NnsRclCppPublisher

gboolean publish (const guint num_tensors,
const GstTensorMemory *tensors_mem, void *bag) final;
void setRosBagWriter (
std::unique_ptr<rosbag2::writers::SequentialWriter> writer);
std::unique_ptr<rosbag2::writers::SequentialWriter> getRosBagWriter ();

private:
static const std::string node_namespace_;
rclcpp::Node::SharedPtr node_;
rclcpp::Publisher<nns_ros2_bridge::msg::Tensors>::SharedPtr publisher_;
std::unique_ptr<rosbag2::writers::SequentialWriter> rb2_sq_writer_;
};

const std::string NnsRclCppPublisher::node_namespace_ = "tensor_ros2_sink";
Expand Down Expand Up @@ -97,7 +113,7 @@ NnsRclCppPublisher::~NnsRclCppPublisher ()
}

/**
* @brief A method for publishing a ROS topic that contains the tensors passed by the NNStreamer framework
* @brief A method for publishing a ROS topic that contains the tensors passed by the NNStreamer framework
* @param[in] num_tensors The number of tensors included in tensors_mem (for the verification purpose)
* @param[in] tensors_mem The pointer of containers consists of information and raw data of tensors
* @return TRUE if the configuration is successfully done
Expand All @@ -106,6 +122,9 @@ gboolean NnsRclCppPublisher::publish (const guint num_tensors,
const GstTensorMemory *tensors_mem, void *bag __attribute__((unused)))
{
nns_ros2_bridge::msg::Tensors tensors_msg;
std::unique_ptr<rosbag2::writers::SequentialWriter> sq_writer;
gboolean ret = TRUE;
rcutils_ret_t rb2_ret;

g_return_val_if_fail (this->ready_to_pub, FALSE);
g_return_val_if_fail (num_tensors == this->num_of_tensors_pub, FALSE);
Expand All @@ -126,15 +145,91 @@ gboolean NnsRclCppPublisher::publish (const guint num_tensors,
rclcpp::spin_some(this->node_);
}

return TRUE;
sq_writer = this->getRosBagWriter ();
if (sq_writer) {
myungjoo marked this conversation as resolved.
Show resolved Hide resolved
rcutils_allocator_t allocator = rcutils_get_default_allocator ();
rcutils_uint8_array_t serialized_message =
rmw_get_zero_initialized_serialized_message ();
auto typesupport_cpp =
rosidl_typesupport_cpp::get_message_type_support_handle<
nns_ros2_bridge::msg::Tensors
> ();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage> ();

rb2_ret =rmw_serialized_message_init (&serialized_message, 0, &allocator);
if (rb2_ret != RMW_RET_OK) {
g_critical ("%s: rosbag2: Failed to initialize rmw_serialized_message\n",
(this->str_nns_helper_name).c_str ());
ret = FALSE;
goto restore_sq_writer;
}

rb2_ret = rmw_serialize (&tensors_msg, typesupport_cpp,
&serialized_message);
if (rb2_ret != RMW_RET_OK) {
g_critical ("%s: rosbag2: Failed to serialize a message of the nns_ros2_bridge::msg::Tensors type\n",
(this->str_nns_helper_name).c_str ());
ret = FALSE;
goto restore_sq_writer;
}
message->serialized_data =
std::make_shared<rcutils_uint8_array_t> (serialized_message);
message->topic_name = this->getPubTopicName ();
rb2_ret = rcutils_system_time_now (&message->time_stamp);
if (rb2_ret != RCUTILS_RET_OK) {
g_critical ("%s: rosbag2: Failed to obtatin the current time\n",
(this->str_nns_helper_name).c_str ());
ret = FALSE;
goto restore_sq_writer;
} else {
sq_writer->write (message);
}

restore_sq_writer:
this->setRosBagWriter (std::move (sq_writer));
}

return ret;
}

/**
* @brief A method to move the ownership for a given argument of
* the std::unique_ptr<rosbag2::writers::SequentialWriter> type to the
* member variable rb2_sq_writer_ of this class
* @param[in] writer a unique_ptr of rosbag2::writers::SequentialWriter to move
*/
void
NnsRclCppPublisher::setRosBagWriter (
std::unique_ptr<rosbag2::writers::SequentialWriter> writer)
{
this->rb2_sq_writer_ = std::move (writer);
}

/**
* @brief A method to get the unique_ptr of rosbag2::writers::SequentialWriter
* which this class holds
* @return the unique_ptr moved from this->rb2_sq_writer_. If this->rb2_sq_writer_
* is not assigned, nullptr is returned.
*/
std::unique_ptr<rosbag2::writers::SequentialWriter>
NnsRclCppPublisher::getRosBagWriter ()
{
std::unique_ptr<rosbag2::writers::SequentialWriter> writer;

if (!this->rb2_sq_writer_)
return nullptr;

writer = std::move (this->rb2_sq_writer_);

return writer;
}

/**
* C functions for NNStreamer plugins that want to publish or subscribe
* ROS topics. nns_ros_publisher_init() should be invoked before configuring the
* ROS2 topics. nns_ros_publisher_init() should be invoked before configuring the
* tensors information to publish or subscribe. Each NNStreamer plugin which
* uses this class should be holding the instance of NnsRclCppPublisher (i.e., the
* void type pointer what the nns_ros_publisher_init() function returns).
* uses this class should be holding the instance of NnsRclCppPublisher (i.e.,
* the void type pointer what the nns_ros_publisher_init() function returns).
*/
void *
nns_ros_publisher_init (const char *node_name, const char *topic_name,
Expand Down Expand Up @@ -181,13 +276,85 @@ nns_ros_publisher_get_pub_topic_name (void *instance)
}

void *
nns_ros_publisher_open_writable_bag (void * instance __attribute__((unused)), const char *name __attribute__((unused)))
nns_ros_publisher_open_writable_bag (void *instance, const char *name)
{
return nullptr;
NnsRclCppPublisher *nrp_instance = (NnsRclCppPublisher *) instance;
static const std::string topic_name =
nns_ros_publisher_get_pub_topic_name (instance);
/**
* Variables for creationg of a rosbag2 writer
* @todo Do we need to make them configurable?
*/
static const std::string storage_id = "sqlite3";
static const std::string serialization_format = "cdr";
static const std::string topic_type = "nns_ros2_bridge/msg/Tensors";
/** Bag file splitting is not used */
const uint64_t max_bagfile_size = 0;
const mode_t mode = 0755;

std::unique_ptr<rosbag2::writers::SequentialWriter> sq_writer =
std::make_unique <rosbag2::writers::SequentialWriter> ();

rosbag2_storage::TopicMetadata rb2_topic_meta;
rosbag2::ConverterOptions rb2_converter_options;
rosbag2::StorageOptions rb2_storage_options;
rosbag2::writers::SequentialWriter *sq_writer_ptr;
char *path_rosbag;

if (name == NULL || name[0] == '\0') {
gchar *cur_dir = g_get_current_dir ();

path_rosbag = g_build_filename (cur_dir, topic_name.c_str (), NULL);
g_free (cur_dir);
} else {
path_rosbag = g_strdup (name);
}

/**
* std::string uri in the rosbag2::StorageOptions structure should be an
* existing directory pathname. In Eloquent, if uri is given as a URI format,
* the 'Failed to create bag' error occurs. Moreover, if there is a file
* whose name is the same as the file that the rosbag2 module tries to create,
* the error would occur. IMO, the features provided by the rosbag2 APIs do
* not look that mature. To simplify those incomprehensible and complex
* conditions, let's assume that the given path does not exist and a directory
* will be created by the given pathname here.
*/
if (g_file_test (path_rosbag, G_FILE_TEST_EXISTS) ||
(g_mkdir (path_rosbag, mode) == -1)) {
g_free (path_rosbag);

return nullptr;
}

rb2_storage_options.max_bagfile_size = max_bagfile_size;
rb2_storage_options.storage_id = storage_id;
rb2_storage_options.uri = g_strdup (path_rosbag);
g_free (path_rosbag);

rb2_topic_meta.name = topic_name;
rb2_topic_meta.type = topic_type;
rb2_topic_meta.serialization_format = serialization_format;

rb2_converter_options.input_serialization_format = serialization_format;
rb2_converter_options.output_serialization_format = serialization_format;

sq_writer->open (rb2_storage_options, rb2_converter_options);
sq_writer->create_topic (rb2_topic_meta);
sq_writer_ptr = sq_writer.get ();
nrp_instance->setRosBagWriter (std::move (sq_writer));

return sq_writer_ptr;
}

void
nns_ros_publisher_close_bag (void * instance __attribute__((unused)), void *bag __attribute__((unused)))
nns_ros_publisher_close_bag (void *instance, void *bag)
{
;
NnsRclCppPublisher *nrp_instance = (NnsRclCppPublisher *) instance;

std::unique_ptr<rosbag2::writers::SequentialWriter> sq_writer =
nrp_instance->getRosBagWriter ();

if ((!bag) && (sq_writer.get () == bag))
sq_writer.release ();
}
7 changes: 7 additions & 0 deletions ros2/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@
<build_depend>std_msgs</build_depend>
<build_depend>rosidl_default_generators</build_depend>

<depend>pluginlib</depend>
<depend>rcutils</depend>
<depend>rosbag2_storage</depend>
<depend>sqlite3_vendor</depend>

<exec_depend>rclcpp</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<exec_depend>rosbag2_converter_default_plugins</exec_depend>
<exec_depend>rosbag2_storage_default_plugins</exec_depend>

<member_of_group>rosidl_interface_packages</member_of_group>

Expand Down