Skip to content

Commit

Permalink
[1.0.0] Use Summary section for get_metadata() and seek(), implement …
Browse files Browse the repository at this point in the history
…remaining methods (ros2#17)

Signed-off-by: John Hurliman <[email protected]>
Signed-off-by: James Smith <[email protected]>
  • Loading branch information
jhurliman authored and james-rms committed Nov 17, 2022
1 parent 250a247 commit 03af72c
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 60 deletions.
2 changes: 1 addition & 1 deletion rosbag2_storage_mcap/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ include(${CMAKE_BINARY_DIR}/conan.cmake)
include(FetchContent)
fetchcontent_declare(mcap
GIT_REPOSITORY https://github.com/foxglove/mcap.git
GIT_TAG 3d19068feb5a78d0073e58b1cf514bad689df94a
GIT_TAG 0851e316cf973c26a64773c56540efbbaf06725a
)
fetchcontent_makeavailable(mcap)

Expand Down
4 changes: 2 additions & 2 deletions rosbag2_storage_mcap/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>rosbag2_storage_mcap</name>
<version>0.0.0</version>
<version>1.0.0</version>
<description>rosbag2 storage plugin using the MCAP file format</description>
<maintainer email="[email protected]">TODO</maintainer>
<maintainer email="[email protected]">John Hurliman</maintainer>
<license>Apache-2.0</license>

<buildtool_depend>ament_cmake</buildtool_depend>
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage_mcap/plugin_description.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
type="rosbag2_storage_plugins::MCAPStorage"
base_class_type="rosbag2_storage::storage_interfaces::ReadWriteInterface"
>
<description>Sample plugin to write a simple linear file.</description>
<description>rosbag2 storage plugin using the MCAP file format</description>
</class>
</library>
124 changes: 70 additions & 54 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
namespace rosbag2_storage_plugins {

using mcap::ByteOffset;
using time_point = std::chrono::time_point<std::chrono::high_resolution_clock>;
static const char FILE_EXTENSION[] = ".mcap";

/**
Expand Down Expand Up @@ -80,6 +81,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

private:
bool read_and_enqueue_message();
void ensure_summary_read();

std::optional<rosbag2_storage::storage_interfaces::IOFlag> opened_as_;
std::string relative_path_;
Expand All @@ -101,6 +103,8 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

std::unique_ptr<mcap::McapWriter> mcap_writer_;
rosbag2_storage_mcap::internal::MessageDefinitionCache msgdef_cache_;

bool has_read_summary_ = false;
};

MCAPStorage::MCAPStorage() {
Expand Down Expand Up @@ -145,7 +149,6 @@ void MCAPStorage::open(const rosbag2_storage::StorageOptions& storage_options,

mcap_writer_ = std::make_unique<mcap::McapWriter>();
mcap::McapWriterOptions options{"ros2"};
// TODO(jhurliman): Use storage_options max_bagfile_duration / max_bagfile_size
auto status = mcap_writer_->open(relative_path_, options);
if (!status.ok()) {
throw std::runtime_error(status.message);
Expand All @@ -159,56 +162,53 @@ void MCAPStorage::open(const rosbag2_storage::StorageOptions& storage_options,

/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorage::get_metadata() {
ensure_summary_read();

metadata_.version = 2;
metadata_.storage_identifier = get_storage_identifier();
metadata_.bag_size = get_bagfile_size();
metadata_.relative_file_paths = {get_relative_file_path()};
std::unordered_map<mcap::SchemaId, rosbag2_storage::TopicInformation>
topic_information_schema_map;
std::unordered_map<mcap::ChannelId, rosbag2_storage::TopicInformation>
topic_information_channel_map;

// Create a TypedRecordReader that will perform a linear scan of the MCAP file.
// This will be a fallback once index parsing is implemented in McapReader
mcap::TypedRecordReader typed_record_reader{*data_source_, 8};
typed_record_reader.onSchema = [&topic_information_schema_map, this](
const mcap::SchemaPtr schema_ptr, ByteOffset,
std::optional<ByteOffset>) {

// Fill out summary metadata from the Statistics record
const mcap::Statistics& stats = mcap_reader_->statistics().value();
metadata_.message_count = stats.messageCount;
metadata_.duration = std::chrono::nanoseconds(stats.messageEndTime - stats.messageStartTime);
metadata_.starting_time = time_point(std::chrono::nanoseconds(stats.messageStartTime));

// Build a list of topic information along with per-topic message counts
metadata_.topics_with_message_count.clear();
for (const auto& [channel_id, channel_ptr] : mcap_reader_->channels()) {
const mcap::Channel& channel = *channel_ptr;

// Look up the Schema for this topic
const auto schema_ptr = mcap_reader_->schema(channel.schemaId);
if (!schema_ptr) {
throw std::runtime_error("Could not find schema for topic " + channel.topic);
}

// Create a TopicInformation for this topic
rosbag2_storage::TopicInformation topic_info{};
topic_info.topic_metadata.name = channel.topic;
topic_info.topic_metadata.serialization_format = channel.messageEncoding;
topic_info.topic_metadata.type = schema_ptr->name;
topic_information_schema_map.insert({schema_ptr->id, topic_info});
};
typed_record_reader.onChannel = [&topic_information_schema_map, &topic_information_channel_map](
const mcap::ChannelPtr channel_ptr, ByteOffset,
std::optional<ByteOffset>) {
auto topic_info = topic_information_schema_map[channel_ptr->schemaId];
topic_info.topic_metadata.name = channel_ptr->topic;
topic_info.topic_metadata.serialization_format = channel_ptr->messageEncoding;
topic_information_channel_map.insert({channel_ptr->id, topic_info});
};
typed_record_reader.onStatistics = [&topic_information_channel_map, this](
const mcap::Statistics& statistics, ByteOffset) {
metadata_.message_count = statistics.messageCount;
metadata_.duration =
std::chrono::nanoseconds(statistics.messageEndTime - statistics.messageStartTime);
metadata_.starting_time = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(statistics.messageStartTime));
for (auto& topic_info_with_id : topic_information_channel_map) {
if (statistics.channelMessageCounts.find(topic_info_with_id.first) !=
statistics.channelMessageCounts.end()) {
topic_info_with_id.second.message_count =
statistics.channelMessageCounts.at(topic_info_with_id.first);
}
metadata_.topics_with_message_count.push_back(topic_info_with_id.second);

// Look up the offered_qos_profiles metadata entry
const auto metadata_it = channel.metadata.find("offered_qos_profiles");
if (metadata_it != channel.metadata.end()) {
topic_info.topic_metadata.offered_qos_profiles = metadata_it->second;
}
};

while (typed_record_reader.next()) {
const auto status = typed_record_reader.status();
if (!status.ok()) {
throw std::runtime_error(status.message);
// Look up the message count for this Channel
const auto message_count_it = stats.channelMessageCounts.find(channel_id);
if (message_count_it != stats.channelMessageCounts.end()) {
topic_info.message_count = message_count_it->second;
} else {
topic_info.message_count = 0;
}

metadata_.topics_with_message_count.push_back(topic_info);
}

return metadata_;
}

Expand All @@ -217,7 +217,15 @@ std::string MCAPStorage::get_relative_file_path() const {
}

uint64_t MCAPStorage::get_bagfile_size() const {
return data_source_->size();
if (opened_as_ == rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) {
return data_source_ ? data_source_->size() : 0;
} else {
if (!mcap_writer_) {
return 0;
}
const auto* data_sink = mcap_writer_->dataSink();
return data_sink ? data_sink->size() : 0;
}
}

std::string MCAPStorage::get_storage_identifier() const {
Expand Down Expand Up @@ -256,6 +264,16 @@ bool MCAPStorage::read_and_enqueue_message() {
return true;
}

void MCAPStorage::ensure_summary_read() {
if (!has_read_summary_) {
const auto status = mcap_reader_->readSummary(mcap::ReadSummaryMethod::AllowFallbackScan);
if (!status.ok()) {
throw std::runtime_error(status.message);
}
has_read_summary_ = true;
}
}

bool MCAPStorage::has_next() {
if (!linear_iterator_) {
return false;
Expand Down Expand Up @@ -307,22 +325,21 @@ void MCAPStorage::reset_filter() {
set_filter(rosbag2_storage::StorageFilter());
}

void MCAPStorage::seek(const rcutils_time_point_value_t& /* time_stamp */) {
// TODO(mcap)
throw std::runtime_error{"seek() is not implemented."};
void MCAPStorage::seek(const rcutils_time_point_value_t& time_stamp) {
ensure_summary_read();

auto start_time = mcap::Timestamp(time_stamp);
linear_view_ = std::make_unique<mcap::LinearMessageView>(mcap_reader_->readMessages(start_time));
linear_iterator_ = std::make_unique<mcap::LinearMessageView::Iterator>(linear_view_->begin());
}

/** ReadWriteInterface **/
uint64_t MCAPStorage::get_minimum_split_file_size() const {
// TODO(mcap)
return 0;
return 1024;
}

/** BaseWriteInterface **/
void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) {
// TODO(mcap) - also note that metadata update could be done differently
// - maybe MCAP library tracks those values and we should just query them

const auto topic_it = topics_.find(msg->topic_name);
if (topic_it == topics_.end()) {
throw std::runtime_error{"Unknown message topic \"" + msg->topic_name + "\""};
Expand Down Expand Up @@ -383,10 +400,9 @@ void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMess
topic_it->second.message_count++;
// Increment global message count
metadata_.message_count++;
// Determine bag duration. Note: this assumes in-order writes.
const auto chrono_ts = std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::nanoseconds(msg->time_stamp));
metadata_.duration = chrono_ts - metadata_.starting_time;
// Determine recording duration
const auto message_time = time_point(std::chrono::nanoseconds(msg->time_stamp));
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

void MCAPStorage::write(
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage_mcap/src/message_definition_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, Amazon.com Inc or its Affiliates. All rights reserved.
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage_mcap/src/message_definition_cache.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, Amazon.com Inc or its Affiliates. All rights reserved.
// Copyright 2022, Foxglove Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 03af72c

Please sign in to comment.