diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index ff9cece328..b60d31bbc7 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -364,7 +364,8 @@ SequentialCompressionWriter::compress_message( std::shared_ptr message) { auto compressed_message = std::make_shared(); - compressed_message->time_stamp = message->time_stamp; + compressed_message->recv_timestamp = message->recv_timestamp; + compressed_message->send_timestamp = message->send_timestamp; compressed_message->topic_name = message->topic_name; compressor.compress_serialized_bag_message(message.get(), compressed_message.get()); return compressed_message; diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 4a738ee226..a659fd2c97 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -199,6 +199,25 @@ class ROSBAG2_CPP_PUBLIC Writer const std::string & type_name, const rclcpp::Time & time); + /** + * Write a serialized message to a bagfile. + * The topic will be created if it has not been created already. + * + * \warning after calling this function, the serialized data will no longer be managed by message. + * + * \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile + * \param topic_name the string of the topic this messages belongs to + * \param type_name the string of the type associated with this message + * \param recv_time The time stamp when the message was received + * \param send_time The time stamp when the message was send + * \throws runtime_error if the Writer is not open. + */ + void write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rcutils_time_point_value_t & recv_timestamp, + const rcutils_time_point_value_t & send_timestamp); /** * Write a non-serialized message to a bagfile. * The topic will be created if it has not been created already. diff --git a/rosbag2_cpp/src/rosbag2_cpp/converter.cpp b/rosbag2_cpp/src/rosbag2_cpp/converter.cpp index e12859f5a5..dffba488e4 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/converter.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/converter.cpp @@ -75,13 +75,14 @@ std::shared_ptr Converter::convert( // deserialize rosbag2_cpp::introspection_message_set_topic_name( allocated_ros_message.get(), message->topic_name.c_str()); - allocated_ros_message->time_stamp = message->time_stamp; + allocated_ros_message->time_stamp = message->recv_timestamp; input_converter_->deserialize(message, introspection_ts, allocated_ros_message); // re-serialize output_message->serialized_data = rosbag2_storage::make_empty_serialized_message(0); output_message->topic_name = std::string(allocated_ros_message->topic_name); - output_message->time_stamp = allocated_ros_message->time_stamp; + output_message->recv_timestamp = message->recv_timestamp; + output_message->send_timestamp = message->send_timestamp; output_converter_->serialize(allocated_ros_message, introspection_ts, output_message); return output_message; } diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 355977d03d..a6e14b4f41 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -135,7 +135,8 @@ void Writer::write( { auto serialized_bag_message = std::make_shared(); serialized_bag_message->topic_name = topic_name; - serialized_bag_message->time_stamp = time.nanoseconds(); + serialized_bag_message->recv_timestamp = time.nanoseconds(); + serialized_bag_message->send_timestamp = time.nanoseconds(); serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t, @@ -182,10 +183,21 @@ void Writer::write( const std::string & topic_name, const std::string & type_name, const rclcpp::Time & time) +{ + write(message, topic_name, type_name, time.nanoseconds(), time.nanoseconds()); +} + +void Writer::write( + std::shared_ptr message, + const std::string & topic_name, + const std::string & type_name, + const rcutils_time_point_value_t & recv_timestamp, + const rcutils_time_point_value_t & send_timestamp) { auto serialized_bag_message = std::make_shared(); serialized_bag_message->topic_name = topic_name; - serialized_bag_message->time_stamp = time.nanoseconds(); + serialized_bag_message->recv_timestamp = recv_timestamp; + serialized_bag_message->send_timestamp = send_timestamp; // point to actual data and keep reference to original message to avoid premature releasing serialized_bag_message->serialized_data = std::shared_ptr( new rcutils_uint8_array_t(message->get_rcl_serialized_message()), diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index eaaaff25e7..10a69b9235 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -352,7 +352,7 @@ void SequentialWriter::write(std::shared_ptrtime_stamp)) { + if (!message_within_accepted_time_range(message->recv_timestamp)) { return; } @@ -368,7 +368,7 @@ void SequentialWriter::write(std::shared_ptr( - std::chrono::nanoseconds(message->time_stamp)); + std::chrono::nanoseconds(message->recv_timestamp)); if (is_first_message_) { // Update bagfile starting time diff --git a/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp index a426a5f414..f0979c2de4 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp @@ -66,7 +66,8 @@ void write_sample_split_bag( auto msg = std::make_shared(); msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value)); - msg->time_stamp = time_stamp; + msg->recv_timestamp = time_stamp; + msg->send_timestamp = time_stamp; msg->topic_name = topic_name; writer.write(msg); } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index 7d67ed7ffb..c348c15554 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -313,7 +313,7 @@ class ReadOrderTest : public ParametrizedTemporaryDirectoryFixture // Check both timestamp and value to uniquely identify messages in expected order ASSERT_TRUE(reader.has_next()); auto next = reader.read_next(); - EXPECT_EQ(next->time_stamp, expect_timestamp); + EXPECT_EQ(next->recv_timestamp, expect_timestamp); ASSERT_EQ(next->serialized_data->buffer_length, 4u); uint32_t value = *reinterpret_cast(next->serialized_data->buffer); diff --git a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp index e5f621faf3..3e692baf4e 100644 --- a/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp +++ b/rosbag2_performance/rosbag2_performance_benchmarking/src/writer_benchmark.cpp @@ -113,7 +113,7 @@ void WriterBenchmark::start_benchmark() get_logger(), "Error getting current time. Error:" << rcutils_get_error_string().str); } - message->time_stamp = time_stamp; + message->recv_timestamp = time_stamp; message->topic_name = queue->topic_name(); try { diff --git a/rosbag2_py/rosbag2_py/_writer.pyi b/rosbag2_py/rosbag2_py/_writer.pyi index 00d3b2e639..ec176c7622 100644 --- a/rosbag2_py/rosbag2_py/_writer.pyi +++ b/rosbag2_py/rosbag2_py/_writer.pyi @@ -1,3 +1,4 @@ +from typing import overload import rosbag2_py._compression_options import rosbag2_py._storage @@ -8,7 +9,10 @@ class SequentialCompressionWriter: def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ... def split_bagfile(self) -> None: ... def take_snapshot(self) -> bool: ... + @overload def write(self, arg0: str, arg1: str, arg2: int) -> None: ... + @overload + def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ... class SequentialWriter: def __init__(self) -> None: ... @@ -18,7 +22,10 @@ class SequentialWriter: def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ... def split_bagfile(self) -> None: ... def take_snapshot(self) -> bool: ... + @overload def write(self, arg0: str, arg1: str, arg2: int) -> None: ... + @overload + def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ... def get_registered_compressors() -> Set[str]: ... def get_registered_serializers() -> Set[str]: ... diff --git a/rosbag2_py/src/rosbag2_py/_reader.cpp b/rosbag2_py/src/rosbag2_py/_reader.cpp index 3f68afc9d1..502534e294 100644 --- a/rosbag2_py/src/rosbag2_py/_reader.cpp +++ b/rosbag2_py/src/rosbag2_py/_reader.cpp @@ -52,7 +52,7 @@ class Reader : public rosbag2_cpp::Reader std::string serialized_data(rcutils_data.buffer, rcutils_data.buffer + rcutils_data.buffer_length); return pybind11::make_tuple( - next->topic_name, pybind11::bytes(serialized_data), next->time_stamp); + next->topic_name, pybind11::bytes(serialized_data), next->recv_timestamp); } }; diff --git a/rosbag2_py/src/rosbag2_py/_writer.cpp b/rosbag2_py/src/rosbag2_py/_writer.cpp index 46b498f4d7..bdac2c12ac 100644 --- a/rosbag2_py/src/rosbag2_py/_writer.cpp +++ b/rosbag2_py/src/rosbag2_py/_writer.cpp @@ -47,6 +47,14 @@ class Writer : public rosbag2_cpp::Writer void write( const std::string & topic_name, const std::string & message, const rcutils_time_point_value_t & time_stamp) + { + write(topic_name, message, time_stamp, time_stamp); + } + + void write( + const std::string & topic_name, const std::string & message, + const rcutils_time_point_value_t & recv_timestamp, + const rcutils_time_point_value_t & send_timestamp) { auto bag_message = std::make_shared(); @@ -54,7 +62,8 @@ class Writer : public rosbag2_cpp::Writer bag_message->topic_name = topic_name; bag_message->serialized_data = rosbag2_storage::make_serialized_message(message.c_str(), message.length()); - bag_message->time_stamp = time_stamp; + bag_message->recv_timestamp = recv_timestamp; + bag_message->send_timestamp = send_timestamp; rosbag2_cpp::Writer::write(bag_message); } @@ -97,7 +106,12 @@ PYBIND11_MODULE(_writer, m) { pybind11::overload_cast< const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions & >(&PyWriter::open)) - .def("write", &PyWriter::write) + .def( + "write", pybind11::overload_cast(&PyWriter::write)) + .def( + "write", pybind11::overload_cast(&PyWriter::write)) .def("close", &PyWriter::close) .def("remove_topic", &PyWriter::remove_topic) .def( @@ -114,7 +128,13 @@ PYBIND11_MODULE(_writer, m) { pybind11::overload_cast< const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions & >(&PyCompressionWriter::open)) - .def("write", &PyCompressionWriter::write) + .def( + "write", pybind11::overload_cast(&PyCompressionWriter::write)) + .def( + "write", pybind11::overload_cast(&PyCompressionWriter::write)) .def("remove_topic", &PyCompressionWriter::remove_topic) .def( "create_topic", diff --git a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp index ab28af6734..3336d85d15 100644 --- a/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp +++ b/rosbag2_storage/include/rosbag2_storage/serialized_bag_message.hpp @@ -27,7 +27,15 @@ namespace rosbag2_storage struct SerializedBagMessage { std::shared_ptr serialized_data; - rcutils_time_point_value_t time_stamp; + /** + * @brief Nanosecond timestamp when this message was received. + */ + rcutils_time_point_value_t recv_timestamp; + /** + * @brief Nanosecond timestamp when this message was initially published. If + * not available, this will be set to recv_timestamp. + */ + rcutils_time_point_value_t send_timestamp; std::string topic_name; }; diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index a1b7ca4339..1992242586 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -526,7 +526,8 @@ bool MCAPStorage::read_and_enqueue_message() const auto & messageView = *it; auto msg = std::make_shared(); last_enqueued_message_offset_ = messageView.messageOffset; - msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime); + msg->recv_timestamp = rcutils_time_point_value_t(messageView.message.logTime); + msg->send_timestamp = rcutils_time_point_value_t(messageView.message.publishTime); msg->topic_name = messageView.channel->topic; msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data, messageView.message.dataSize); @@ -597,7 +598,7 @@ bool MCAPStorage::enqueued_message_is_already_read() if (next_ == nullptr) { return false; } - if (last_read_time_point_ != next_->time_stamp) { + if (last_read_time_point_ != next_->recv_timestamp) { return false; } if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) { @@ -684,7 +685,7 @@ std::shared_ptr MCAPStorage::read_next() if (!has_next()) { throw std::runtime_error{"No next message is available."}; } - last_read_time_point_ = next_->time_stamp; + last_read_time_point_ = next_->recv_timestamp; last_read_message_offset_ = last_enqueued_message_offset_; // Importantly, clear next_ via move so that a next message can be read. return std::move(next_); @@ -779,11 +780,11 @@ void MCAPStorage::write_lock_free(std::shared_ptrsecond; mcap_msg.sequence = 0; - if (msg->time_stamp < 0) { - RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->time_stamp); + if (msg->recv_timestamp < 0) { + RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->recv_timestamp); } - mcap_msg.logTime = mcap::Timestamp(msg->time_stamp); - mcap_msg.publishTime = mcap_msg.logTime; + mcap_msg.logTime = mcap::Timestamp(msg->recv_timestamp); + mcap_msg.publishTime = mcap::Timestamp(msg->send_timestamp); mcap_msg.dataSize = msg->serialized_data->buffer_length; mcap_msg.data = reinterpret_cast(msg->serialized_data->buffer); const auto status = mcap_writer_->write(mcap_msg); @@ -799,7 +800,7 @@ void MCAPStorage::write_lock_free(std::shared_ptrtime_stamp)); + const auto message_time = time_point(std::chrono::nanoseconds(msg->recv_timestamp)); metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time); } diff --git a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp index 55f90657e5..27b5bb745d 100644 --- a/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp +++ b/rosbag2_storage_mcap/test/rosbag2_storage_mcap/test_mcap_storage.cpp @@ -69,7 +69,7 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix rw_storage->create_topic(topic_metadata, std::get<3>(msg)); auto bag_message = std::make_shared(); bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); - bag_message->time_stamp = std::get<1>(msg); + bag_message->recv_timestamp = std::get<1>(msg); bag_message->topic_name = topic_metadata.name; rw_storage->write(bag_message); } @@ -208,7 +208,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file) serialized_bag_msg->serialized_data = std::shared_ptr( const_cast(&serialized_msg->get_rcl_serialized_message()), [](rcutils_uint8_array_t * /* data */) {}); - serialized_bag_msg->time_stamp = time_stamp; + serialized_bag_msg->recv_timestamp = time_stamp; serialized_bag_msg->topic_name = topic_name; writer->write(serialized_bag_msg); } @@ -295,7 +295,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml) serialized_bag_msg->serialized_data = std::shared_ptr( const_cast(&serialized_msg->get_rcl_serialized_message()), [](rcutils_uint8_array_t * /* data */) {}); - serialized_bag_msg->time_stamp = time_stamp; + serialized_bag_msg->recv_timestamp = time_stamp; serialized_bag_msg->topic_name = topic_name; writer->write(serialized_bag_msg); writer->write(serialized_bag_msg); diff --git a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp index f4a1e0179e..66869c8815 100644 --- a/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/src/rosbag2_storage_sqlite3/sqlite_storage.cpp @@ -302,7 +302,9 @@ void SqliteStorage::write_locked( } try { - write_statement_->bind(message->time_stamp, topic_entry->second, message->serialized_data); + write_statement_->bind( + message->recv_timestamp, topic_entry->second, + message->serialized_data); } catch (const SqliteException & exc) { if (SQLITE_TOOBIG == exc.get_sqlite_return_code()) { // Get the sqlite string/blob limit. @@ -380,12 +382,12 @@ std::shared_ptr SqliteStorage::read_next( auto bag_message = std::make_shared(); bag_message->serialized_data = std::get<0>(*current_message_row_); - bag_message->time_stamp = std::get<1>(*current_message_row_); + bag_message->recv_timestamp = std::get<1>(*current_message_row_); bag_message->topic_name = std::get<2>(*current_message_row_); // set start time to current time // and set seek_row_id to the new row id up - seek_time_ = bag_message->time_stamp; + seek_time_ = bag_message->recv_timestamp; seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1); ++current_message_row_; diff --git a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp index c2ab38a769..3fc71eed2a 100644 --- a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp +++ b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/storage_test_fixture.hpp @@ -112,7 +112,7 @@ class StorageTestFixture : public TemporaryDirectoryFixture rw_storage.create_topic({0u, topic_name, type_name, rmw_format, {}, ""}, {}); auto bag_message = std::make_shared(); bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); - bag_message->time_stamp = std::get<1>(msg); + bag_message->recv_timestamp = std::get<1>(msg); bag_message->topic_name = topic_name; rw_storage.write(bag_message); } @@ -188,14 +188,16 @@ class StorageTestFixture : public TemporaryDirectoryFixture // Prepare rosbag2 serialized message to write auto message = std::make_shared(); message->serialized_data = make_serialized_message(std::get<0>(msg)); - message->time_stamp = std::get<1>(msg); + message->recv_timestamp = std::get<1>(msg); message->topic_name = topic_name; // Write message to DB auto topic_entry = topics.find(topic_name); if (topic_entry == end(topics)) { throw SqliteException("Topic '" + topic_name + "' has not been created yet!"); } - write_statement->bind(message->time_stamp, topic_entry->second, message->serialized_data); + write_statement->bind( + message->recv_timestamp, topic_entry->second, + message->serialized_data); write_statement->execute_and_reset(); } } diff --git a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp index 8546b35fa7..d6245c9e1f 100644 --- a/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp +++ b/rosbag2_storage_sqlite3/test/rosbag2_storage_sqlite3/test_sqlite_storage.cpp @@ -71,7 +71,7 @@ TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqli ASSERT_THAT(read_messages, SizeIs(3)); for (size_t i = 0; i < 3; i++) { EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i])); - EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i]))); + EXPECT_THAT(read_messages[i]->recv_timestamp, Eq(std::get<1>(messages[i]))); EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i])); } } @@ -111,10 +111,10 @@ TEST_F(StorageTestFixture, get_next_returns_messages_in_timestamp_order) { EXPECT_TRUE(readable_storage->has_next()); auto first_message = readable_storage->read_next(); - EXPECT_THAT(first_message->time_stamp, Eq(2)); + EXPECT_THAT(first_message->recv_timestamp, Eq(2)); EXPECT_TRUE(readable_storage->has_next()); auto second_message = readable_storage->read_next(); - EXPECT_THAT(second_message->time_stamp, Eq(6)); + EXPECT_THAT(second_message->recv_timestamp, Eq(6)); EXPECT_FALSE(readable_storage->has_next()); } @@ -343,7 +343,7 @@ TEST_F(StorageTestFixture, messages_readable_for_prefoxy_db_schema) { ASSERT_THAT(read_messages, SizeIs(3)); for (size_t i = 0; i < 3; i++) { EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i])); - EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i]))); + EXPECT_THAT(read_messages[i]->recv_timestamp, Eq(std::get<1>(messages[i]))); EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i])); } } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp index 273543b0ee..f1d14cc0b8 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp @@ -63,7 +63,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example) writer.open(storage_options); auto bag_message = std::make_shared(); - auto ret = rcutils_system_time_now(&bag_message->time_stamp); + auto ret = rcutils_system_time_now(&bag_message->recv_timestamp); if (ret != RCL_RET_OK) { FAIL() << "couldn't assign time rosbag message"; } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp index dff1275a7b..7233b6b84f 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_storage_api.cpp @@ -82,8 +82,9 @@ class Rosbag2StorageAPITests : public rosbag2_test_common::ParametrizedTemporary auto bag_message = std::make_shared(); bag_message->serialized_data = memory_management_->serialize_message(std_string_msg); - bag_message->time_stamp = + bag_message->recv_timestamp = std::chrono::duration_cast(timestamp).count(); + bag_message->send_timestamp = bag_message->recv_timestamp; bag_message->topic_name = topic; timestamp += dt; serialized_messages.push_back(bag_message); diff --git a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp index 0ea9771b10..db94ccd877 100644 --- a/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp +++ b/rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp @@ -53,7 +53,7 @@ std::shared_ptr get_next( if (msg == nullptr) { continue; } - if (earliest_msg == nullptr || msg->time_stamp < earliest_msg->time_stamp) { + if (earliest_msg == nullptr || msg->recv_timestamp < earliest_msg->recv_timestamp) { earliest_msg = msg; earliest_msg_index = i; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 6286c6e61a..c5b5376f02 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -643,10 +643,10 @@ bool PlayerImpl::play_next() bool next_message_published = false; while (rclcpp::ok() && !next_message_published && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) { next_message_published = publish_message(message_ptr); - clock_->jump(message_ptr->time_stamp); + clock_->jump(message_ptr->recv_timestamp); message_queue_.pop(); message_ptr = peek_next_message_from_queue(); @@ -849,11 +849,11 @@ void PlayerImpl::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (rclcpp::ok() && !stop_playback_ && - message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->time_stamp)) + message_ptr != nullptr && !shall_stop_at_timestamp(message_ptr->recv_timestamp)) { // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { + while (rclcpp::ok() && !clock_->sleep_until(message_ptr->recv_timestamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; } diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 89d6128738..d88f8802fc 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -561,16 +561,53 @@ std::shared_ptr RecorderImpl::create_subscription( const std::string & topic_name, const std::string & topic_type, const rclcpp::QoS & qos) { - auto subscription = node->create_generic_subscription( - topic_name, - topic_type, - qos, - [this, topic_name, topic_type](std::shared_ptr message) { - if (!paused_.load()) { - writer_->write(message, topic_name, topic_type, node->get_clock()->now()); - } - }); - return subscription; +#ifdef _WIN32 + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != + std::string::npos) + { + return node->create_generic_subscription( + topic_name, + topic_type, + qos, + [this, topic_name, topic_type](std::shared_ptr message, + const rclcpp::MessageInfo &) { + if (!paused_.load()) { + writer_->write( + std::move(message), topic_name, topic_type, node->now().nanoseconds(), + 0); + } + }); + } +#endif + + if (record_options_.use_sim_time) { + return node->create_generic_subscription( + topic_name, + topic_type, + qos, + [this, topic_name, topic_type](std::shared_ptr message, + const rclcpp::MessageInfo & mi) { + if (!paused_.load()) { + writer_->write( + std::move(message), topic_name, topic_type, node->now().nanoseconds(), + mi.get_rmw_message_info().source_timestamp); + } + }); + } else { + return node->create_generic_subscription( + topic_name, + topic_type, + qos, + [this, topic_name, topic_type](std::shared_ptr message, + const rclcpp::MessageInfo & mi) { + if (!paused_.load()) { + writer_->write( + std::move(message), topic_name, topic_type, + mi.get_rmw_message_info().received_timestamp, + mi.get_rmw_message_info().source_timestamp); + } + }); + } } std::vector RecorderImpl::offered_qos_profiles_for_topic( diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 46ac168c01..a064704574 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -121,7 +121,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn metadata_.message_count = messages.size(); if (!messages.empty()) { const auto message_timestamp = std::chrono::time_point( - std::chrono::nanoseconds(messages[0]->time_stamp)); + std::chrono::nanoseconds(messages[0]->recv_timestamp)); metadata_.starting_time = message_timestamp; } messages_ = std::move(messages); diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 9e75cc84fa..594c1a92a3 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -60,7 +60,7 @@ class Rosbag2TransportTestFixture : public Test { auto bag_msg = std::make_shared(); bag_msg->serialized_data = memory_management_.serialize_message(message); - bag_msg->time_stamp = milliseconds * 1000000; + bag_msg->recv_timestamp = milliseconds * 1000000; bag_msg->topic_name = topic; return bag_msg; diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp index 5988fdd598..834afe050e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -249,7 +249,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_burst) { auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = - std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + std::chrono::nanoseconds(messages.back()->recv_timestamp - messages[1]->recv_timestamp); // Check for lower bound with some tolerance ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); // Check for upper bound with some tolerance diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index 89d043d602..3097be472e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -103,8 +103,9 @@ TEST_F(RosBag2PlayTestFixture, invalid_keybindings) {serialize_test_message("topic1", 0, primitive_message1), serialize_test_message("topic1", 0, primitive_message2)}; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->recv_timestamp = 100; + messages[1]->recv_timestamp = messages[0]->recv_timestamp + + message_time_difference.nanoseconds(); play_options_.play_next_key = KeyboardHandler::KeyCode::UNKNOWN; @@ -138,9 +139,11 @@ TEST_F(RosBag2PlayTestFixture, test_keyboard_controls) serialize_test_message("topic1", 0, primitive_message2), serialize_test_message("topic1", 0, primitive_message3)}; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); - messages[2]->time_stamp = messages[1]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->recv_timestamp = 100; + messages[1]->recv_timestamp = messages[0]->recv_timestamp + + message_time_difference.nanoseconds(); + messages[2]->recv_timestamp = messages[1]->recv_timestamp + + message_time_difference.nanoseconds(); auto prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topics_and_types); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp index 8c6ec32897..3c198c1867 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -202,7 +202,7 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { auto replay_time = std::chrono::steady_clock::now() - start; auto expected_replay_time = - std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + std::chrono::nanoseconds(messages.back()->recv_timestamp - messages[1]->recv_timestamp); // Check for lower bound with some tolerance ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); // Check for upper bound with some tolerance diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp index d3f58e4655..b2f2da1767 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp @@ -55,8 +55,9 @@ class PlayerTestFixture : public Rosbag2TransportTestFixture serialize_test_message("topic1", 0, primitive_message2) }; - messages[0]->time_stamp = 100; - messages[1]->time_stamp = messages[0]->time_stamp + message_time_difference.nanoseconds(); + messages[0]->recv_timestamp = 100; + messages[1]->recv_timestamp = messages[0]->recv_timestamp + + message_time_difference.nanoseconds(); prepared_mock_reader = std::make_unique(); prepared_mock_reader->prepare(messages, topics_and_types); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp index b95635dc72..423994c8c4 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record_all_use_sim_time.cpp @@ -135,5 +135,24 @@ TEST_F(RecordIntegrationTestFixture, record_all_with_sim_time) } } // check that the timestamp is same as the clock message - EXPECT_THAT(string_messages[0]->time_stamp, time_value); + EXPECT_THAT(string_messages[0]->recv_timestamp, time_value); + + bool rwm_has_timestamp_support = true; + +#ifdef _WIN32 + if (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") != + std::string::npos) + { + rwm_has_timestamp_support = false; + } +#endif + + if (rwm_has_timestamp_support) { + // Check that the send_timestamp is not the same as the clock message + EXPECT_NE(string_messages[0]->send_timestamp, time_value); + EXPECT_NE(string_messages[0]->send_timestamp, 0); + } else { + // if rwm has not timestamp support, send_timestamp must be zero + EXPECT_EQ(string_messages[0]->send_timestamp, 0); + } }