Skip to content

Commit

Permalink
Use middleware send and receive timestamps from message_info during r…
Browse files Browse the repository at this point in the history
…ecording (#1531)

* feat: Use middleware receive timestamp instead of now in non sim time case

This should create better timestamp in cases of high load.

Signed-off-by: Janosch Machowinski <[email protected]>

* feat(SerializedBagMessage): Added publish_time_stamp to and renamed time_stamp to receive_time_stamp

Signed-off-by: Janosch Machowinski <[email protected]>

* chore: renamed receive_time_stamp to recv_timestamp / publish_time_stamp to send_timestamp

Co-authored-by: Michael Orlov <[email protected]>
Signed-off-by: Janosch Machowinski <[email protected]>

* feat: Export write call with source timestamp to python

Signed-off-by: Janosch Machowinski <[email protected]>

* refactor: minor style update

Signed-off-by: Janosch Machowinski <[email protected]>

* fix: Disable usage of rmw source timestamp on windows when using connextdds.

The windows rmw connextdds does not provide a value in source timestamp
nor in receive timestamp, therefore we fall back to current node time.

Signed-off-by: Janosch Machowinski <[email protected]>

* chore: requested changes

Signed-off-by: Janosch Machowinski <[email protected]>

* fix: Fixed expected test result in case of rmw_connextdds

Signed-off-by: Janosch Machowinski <[email protected]>

* chore: Regenerated stubs

Signed-off-by: Janosch Machowinski <[email protected]>

---------

Signed-off-by: Janosch Machowinski <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
Co-authored-by: Michael Orlov <[email protected]>
  • Loading branch information
3 people authored Apr 11, 2024
1 parent 6202aae commit bee10b4
Show file tree
Hide file tree
Showing 29 changed files with 200 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ SequentialCompressionWriter::compress_message(
std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
{
auto compressed_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
compressed_message->time_stamp = message->time_stamp;
compressed_message->recv_timestamp = message->recv_timestamp;
compressed_message->send_timestamp = message->send_timestamp;
compressed_message->topic_name = message->topic_name;
compressor.compress_serialized_bag_message(message.get(), compressed_message.get());
return compressed_message;
Expand Down
19 changes: 19 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,25 @@ class ROSBAG2_CPP_PUBLIC Writer
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \warning after calling this function, the serialized data will no longer be managed by message.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param recv_time The time stamp when the message was received
* \param send_time The time stamp when the message was send
* \throws runtime_error if the Writer is not open.
*/
void write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp);
/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
Expand Down
5 changes: 3 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> Converter::convert(
// deserialize
rosbag2_cpp::introspection_message_set_topic_name(
allocated_ros_message.get(), message->topic_name.c_str());
allocated_ros_message->time_stamp = message->time_stamp;
allocated_ros_message->time_stamp = message->recv_timestamp;
input_converter_->deserialize(message, introspection_ts, allocated_ros_message);

// re-serialize
output_message->serialized_data = rosbag2_storage::make_empty_serialized_message(0);
output_message->topic_name = std::string(allocated_ros_message->topic_name);
output_message->time_stamp = allocated_ros_message->time_stamp;
output_message->recv_timestamp = message->recv_timestamp;
output_message->send_timestamp = message->send_timestamp;
output_converter_->serialize(allocated_ros_message, introspection_ts, output_message);
return output_message;
}
Expand Down
16 changes: 14 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ void Writer::write(
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->recv_timestamp = time.nanoseconds();
serialized_bag_message->send_timestamp = time.nanoseconds();

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
Expand Down Expand Up @@ -182,10 +183,21 @@ void Writer::write(
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
write(message, topic_name, type_name, time.nanoseconds(), time.nanoseconds());
}

void Writer::write(
std::shared_ptr<const rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();
serialized_bag_message->recv_timestamp = recv_timestamp;
serialized_bag_message->send_timestamp = send_timestamp;
// point to actual data and keep reference to original message to avoid premature releasing
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t(message->get_rcl_serialized_message()),
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
throw std::runtime_error("Bag is not open. Call open() before writing.");
}

if (!message_within_accepted_time_range(message->time_stamp)) {
if (!message_within_accepted_time_range(message->recv_timestamp)) {
return;
}

Expand All @@ -368,7 +368,7 @@ void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBa
}

const auto message_timestamp = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(message->time_stamp));
std::chrono::nanoseconds(message->recv_timestamp));

if (is_first_message_) {
// Update bagfile starting time
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_cpp/test/rosbag2_cpp/fake_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ void write_sample_split_bag(

auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
msg->serialized_data = rosbag2_storage::make_serialized_message(&value, sizeof(value));
msg->time_stamp = time_stamp;
msg->recv_timestamp = time_stamp;
msg->send_timestamp = time_stamp;
msg->topic_name = topic_name;
writer.write(msg);
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class ReadOrderTest : public ParametrizedTemporaryDirectoryFixture
// Check both timestamp and value to uniquely identify messages in expected order
ASSERT_TRUE(reader.has_next());
auto next = reader.read_next();
EXPECT_EQ(next->time_stamp, expect_timestamp);
EXPECT_EQ(next->recv_timestamp, expect_timestamp);

ASSERT_EQ(next->serialized_data->buffer_length, 4u);
uint32_t value = *reinterpret_cast<uint32_t *>(next->serialized_data->buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void WriterBenchmark::start_benchmark()
get_logger(), "Error getting current time. Error:" <<
rcutils_get_error_string().str);
}
message->time_stamp = time_stamp;
message->recv_timestamp = time_stamp;
message->topic_name = queue->topic_name();

try {
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_py/rosbag2_py/_writer.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import overload
import rosbag2_py._compression_options
import rosbag2_py._storage

Expand All @@ -8,7 +9,10 @@ class SequentialCompressionWriter:
def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...
def split_bagfile(self) -> None: ...
def take_snapshot(self) -> bool: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int) -> None: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ...

class SequentialWriter:
def __init__(self) -> None: ...
Expand All @@ -18,7 +22,10 @@ class SequentialWriter:
def remove_topic(self, arg0: rosbag2_py._storage.TopicMetadata) -> None: ...
def split_bagfile(self) -> None: ...
def take_snapshot(self) -> bool: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int) -> None: ...
@overload
def write(self, arg0: str, arg1: str, arg2: int, arg3: int) -> None: ...

def get_registered_compressors() -> Set[str]: ...
def get_registered_serializers() -> Set[str]: ...
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_py/src/rosbag2_py/_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Reader : public rosbag2_cpp::Reader
std::string serialized_data(rcutils_data.buffer,
rcutils_data.buffer + rcutils_data.buffer_length);
return pybind11::make_tuple(
next->topic_name, pybind11::bytes(serialized_data), next->time_stamp);
next->topic_name, pybind11::bytes(serialized_data), next->recv_timestamp);
}
};

Expand Down
26 changes: 23 additions & 3 deletions rosbag2_py/src/rosbag2_py/_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,23 @@ class Writer : public rosbag2_cpp::Writer
void write(
const std::string & topic_name, const std::string & message,
const rcutils_time_point_value_t & time_stamp)
{
write(topic_name, message, time_stamp, time_stamp);
}

void write(
const std::string & topic_name, const std::string & message,
const rcutils_time_point_value_t & recv_timestamp,
const rcutils_time_point_value_t & send_timestamp)
{
auto bag_message =
std::make_shared<rosbag2_storage::SerializedBagMessage>();

bag_message->topic_name = topic_name;
bag_message->serialized_data =
rosbag2_storage::make_serialized_message(message.c_str(), message.length());
bag_message->time_stamp = time_stamp;
bag_message->recv_timestamp = recv_timestamp;
bag_message->send_timestamp = send_timestamp;

rosbag2_cpp::Writer::write(bag_message);
}
Expand Down Expand Up @@ -97,7 +106,12 @@ PYBIND11_MODULE(_writer, m) {
pybind11::overload_cast<
const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions &
>(&PyWriter::open))
.def("write", &PyWriter::write)
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &>(&PyWriter::write))
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &, const rcutils_time_point_value_t &>(&PyWriter::write))
.def("close", &PyWriter::close)
.def("remove_topic", &PyWriter::remove_topic)
.def(
Expand All @@ -114,7 +128,13 @@ PYBIND11_MODULE(_writer, m) {
pybind11::overload_cast<
const rosbag2_storage::StorageOptions &, const rosbag2_cpp::ConverterOptions &
>(&PyCompressionWriter::open))
.def("write", &PyCompressionWriter::write)
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &>(&PyCompressionWriter::write))
.def(
"write", pybind11::overload_cast<const std::string &, const std::string &,
const rcutils_time_point_value_t &,
const rcutils_time_point_value_t &>(&PyCompressionWriter::write))
.def("remove_topic", &PyCompressionWriter::remove_topic)
.def(
"create_topic",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ namespace rosbag2_storage
struct SerializedBagMessage
{
std::shared_ptr<rcutils_uint8_array_t> serialized_data;
rcutils_time_point_value_t time_stamp;
/**
* @brief Nanosecond timestamp when this message was received.
*/
rcutils_time_point_value_t recv_timestamp;
/**
* @brief Nanosecond timestamp when this message was initially published. If
* not available, this will be set to recv_timestamp.
*/
rcutils_time_point_value_t send_timestamp;
std::string topic_name;
};

Expand Down
17 changes: 9 additions & 8 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ bool MCAPStorage::read_and_enqueue_message()
const auto & messageView = *it;
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
last_enqueued_message_offset_ = messageView.messageOffset;
msg->time_stamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->recv_timestamp = rcutils_time_point_value_t(messageView.message.logTime);
msg->send_timestamp = rcutils_time_point_value_t(messageView.message.publishTime);
msg->topic_name = messageView.channel->topic;
msg->serialized_data = rosbag2_storage::make_serialized_message(messageView.message.data,
messageView.message.dataSize);
Expand Down Expand Up @@ -597,7 +598,7 @@ bool MCAPStorage::enqueued_message_is_already_read()
if (next_ == nullptr) {
return false;
}
if (last_read_time_point_ != next_->time_stamp) {
if (last_read_time_point_ != next_->recv_timestamp) {
return false;
}
if (read_order_ == mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
Expand Down Expand Up @@ -684,7 +685,7 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> MCAPStorage::read_next()
if (!has_next()) {
throw std::runtime_error{"No next message is available."};
}
last_read_time_point_ = next_->time_stamp;
last_read_time_point_ = next_->recv_timestamp;
last_read_message_offset_ = last_enqueued_message_offset_;
// Importantly, clear next_ via move so that a next message can be read.
return std::move(next_);
Expand Down Expand Up @@ -779,11 +780,11 @@ void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::Seriali
mcap::Message mcap_msg;
mcap_msg.channelId = channel_it->second;
mcap_msg.sequence = 0;
if (msg->time_stamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->time_stamp);
if (msg->recv_timestamp < 0) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Invalid message timestamp %ld", msg->recv_timestamp);
}
mcap_msg.logTime = mcap::Timestamp(msg->time_stamp);
mcap_msg.publishTime = mcap_msg.logTime;
mcap_msg.logTime = mcap::Timestamp(msg->recv_timestamp);
mcap_msg.publishTime = mcap::Timestamp(msg->send_timestamp);
mcap_msg.dataSize = msg->serialized_data->buffer_length;
mcap_msg.data = reinterpret_cast<const std::byte *>(msg->serialized_data->buffer);
const auto status = mcap_writer_->write(mcap_msg);
Expand All @@ -799,7 +800,7 @@ void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::Seriali
// Increment global message count
metadata_.message_count++;
// Determine recording duration
const auto message_time = time_point(std::chrono::nanoseconds(msg->time_stamp));
const auto message_time = time_point(std::chrono::nanoseconds(msg->recv_timestamp));
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class McapStorageTestFixture : public rosbag2_test_common::TemporaryDirectoryFix
rw_storage->create_topic(topic_metadata, std::get<3>(msg));
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->recv_timestamp = std::get<1>(msg);
bag_message->topic_name = topic_metadata.name;
rw_storage->write(bag_message);
}
Expand Down Expand Up @@ -208,7 +208,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_and_read_basic_mcap_file)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->recv_timestamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
}
Expand Down Expand Up @@ -295,7 +295,7 @@ TEST_F(TemporaryDirectoryFixture, can_write_mcap_with_zstd_configured_from_yaml)
serialized_bag_msg->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&serialized_msg->get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
serialized_bag_msg->time_stamp = time_stamp;
serialized_bag_msg->recv_timestamp = time_stamp;
serialized_bag_msg->topic_name = topic_name;
writer->write(serialized_bag_msg);
writer->write(serialized_bag_msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ void SqliteStorage::write_locked(
}

try {
write_statement_->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement_->bind(
message->recv_timestamp, topic_entry->second,
message->serialized_data);
} catch (const SqliteException & exc) {
if (SQLITE_TOOBIG == exc.get_sqlite_return_code()) {
// Get the sqlite string/blob limit.
Expand Down Expand Up @@ -380,12 +382,12 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SqliteStorage::read_next(

auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = std::get<0>(*current_message_row_);
bag_message->time_stamp = std::get<1>(*current_message_row_);
bag_message->recv_timestamp = std::get<1>(*current_message_row_);
bag_message->topic_name = std::get<2>(*current_message_row_);

// set start time to current time
// and set seek_row_id to the new row id up
seek_time_ = bag_message->time_stamp;
seek_time_ = bag_message->recv_timestamp;
seek_row_id_ = std::get<3>(*current_message_row_) + (read_order_.reverse ? -1 : 1);

++current_message_row_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class StorageTestFixture : public TemporaryDirectoryFixture
rw_storage.create_topic({0u, topic_name, type_name, rmw_format, {}, ""}, {});
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_message->serialized_data = make_serialized_message(std::get<0>(msg));
bag_message->time_stamp = std::get<1>(msg);
bag_message->recv_timestamp = std::get<1>(msg);
bag_message->topic_name = topic_name;
rw_storage.write(bag_message);
}
Expand Down Expand Up @@ -188,14 +188,16 @@ class StorageTestFixture : public TemporaryDirectoryFixture
// Prepare rosbag2 serialized message to write
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->serialized_data = make_serialized_message(std::get<0>(msg));
message->time_stamp = std::get<1>(msg);
message->recv_timestamp = std::get<1>(msg);
message->topic_name = topic_name;
// Write message to DB
auto topic_entry = topics.find(topic_name);
if (topic_entry == end(topics)) {
throw SqliteException("Topic '" + topic_name + "' has not been created yet!");
}
write_statement->bind(message->time_stamp, topic_entry->second, message->serialized_data);
write_statement->bind(
message->recv_timestamp, topic_entry->second,
message->serialized_data);
write_statement->execute_and_reset();
}
}
Expand Down
Loading

0 comments on commit bee10b4

Please sign in to comment.