Skip to content

Commit

Permalink
Merge branch 'ros2:rolling' into ricardo-manriquez/node-name-option
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardo-manriquez authored Dec 7, 2022
2 parents d9b6e92 + 05d2e43 commit d7f882c
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 53 deletions.
6 changes: 3 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
rcutils_time_point_value_t seek_time_ = 0;
rosbag2_storage::StorageFilter topics_filter_{};
std::vector<rosbag2_storage::TopicMetadata> topics_metadata_{};
std::vector<std::string> file_paths_{}; // List of database files.
std::vector<std::string>::iterator current_file_iterator_{}; // Index of file to read from
std::unordered_set<std::string> preprocessed_file_paths_; // List of preprocessed paths
std::vector<std::string> file_paths_{};
std::vector<std::string>::iterator current_file_iterator_{};
std::unordered_set<std::string> preprocessed_file_paths_;

// Hang on to this because storage_options_ is mutated to point at individual files
std::string base_folder_;
Expand Down
8 changes: 2 additions & 6 deletions rosbag2_cpp/include/rosbag2_cpp/reindexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,8 @@ namespace rosbag2_cpp
/**
* Tool to reconstruct bag metadata files in the event of loss or corruption
*
* Reindexing is an operation where a bag that is missing a metadata.yaml file can have a new
* file created through parsing of the metadata stored within the actual files of the bag.
* For instance: Imagine we are working with SQL databases (.db3). We can open the individual
* .db3 files within the bag and read their metadata (not the messages themselves) to replicate
* a usable metadata.yaml file, so that the bag can once again be read by the standard read
* command.
* Reindexing recreates metadata.yaml for a bag that is missing that file.
* This is done by opening the storage directly and reading the contents to accumulate metadata.
*
* Reindexing has some limitations - It cannot perfectly replicate the original metadata file,
* since some information known by the program from the start up command cannot be found
Expand Down
14 changes: 7 additions & 7 deletions rosbag2_cpp/src/rosbag2_cpp/reindexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ bool Reindexer::compare_relative_file(
throw std::runtime_error(error_text.c_str());
}

auto first_db_num = std::stoul(first_match.str(1), nullptr, 10);
auto second_db_num = std::stoul(second_match.str(1), nullptr, 10);
auto first_file_num = std::stoul(first_match.str(1), nullptr, 10);
auto second_file_num = std::stoul(second_match.str(1), nullptr, 10);

return first_db_num < second_db_num;
return first_file_num < second_file_num;
}

/// Retrieve bag storage files from the bag directory.
Expand Down Expand Up @@ -122,7 +122,7 @@ void Reindexer::get_bag_files(
}
} while (rcutils_dir_iter_next(dir_iter));

// Sort relative file path by database number
// Sort relative file path by number
std::sort(
output.begin(), output.end(),
[&, this](rcpputils::fs::path a, rcpputils::fs::path b) {
Expand Down Expand Up @@ -169,7 +169,7 @@ void Reindexer::aggregate_metadata(
// In order to most accurately reconstruct the metadata, we need to
// visit each of the contained relative files files in the bag,
// open them, read the info, and write it into an aggregated metadata object.
ROSBAG2_CPP_LOG_DEBUG_STREAM("Extracting metadata from database(s)");
ROSBAG2_CPP_LOG_DEBUG_STREAM("Extracting metadata from bag file(s)");
for (const auto & f_ : files) {
ROSBAG2_CPP_LOG_DEBUG_STREAM("Extracting from file: " + f_.string());

Expand Down Expand Up @@ -249,13 +249,13 @@ void Reindexer::reindex(const rosbag2_storage::StorageOptions & storage_options)
std::vector<rcpputils::fs::path> files;
get_bag_files(base_folder_, files);
if (files.empty()) {
throw std::runtime_error("No database files found for reindexing. Abort");
throw std::runtime_error("No storage files found for reindexing. Abort");
}

init_metadata(files, storage_options);
ROSBAG2_CPP_LOG_DEBUG_STREAM("Completed init_metadata");

// Collect all metadata from database files
// Collect all metadata from files
aggregate_metadata(files, bag_reader, storage_options);
ROSBAG2_CPP_LOG_DEBUG_STREAM("Completed aggregate_metadata");

Expand Down
12 changes: 6 additions & 6 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ void SequentialWriter::open(
converter_ = std::make_unique<Converter>(converter_options, converter_factory_);
}

rcpputils::fs::path db_path(storage_options.uri);
if (db_path.is_directory()) {
rcpputils::fs::path storage_path(storage_options.uri);
if (storage_path.is_directory()) {
std::stringstream error;
error << "Database directory already exists (" << db_path.string() <<
"), can't overwrite existing database";
error << "Bag directory already exists (" << storage_path.string() <<
"), can't overwrite existing bag";
throw std::runtime_error{error.str()};
}

bool dir_created = rcpputils::fs::create_directories(db_path);
bool dir_created = rcpputils::fs::create_directories(storage_path);
if (!dir_created) {
std::stringstream error;
error << "Failed to create database directory (" << db_path.string() << ").";
error << "Failed to create bag directory (" << storage_path.string() << ").";
throw std::runtime_error{error.str()};
}

Expand Down
15 changes: 11 additions & 4 deletions rosbag2_cpp/test/rosbag2_cpp/test_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ TEST_F(TemporaryDirectoryFixture, read_metadata_supports_version_2) {

TEST_F(TemporaryDirectoryFixture, read_metadata_supports_version_6) {
const auto expected_storage_id = rosbag2_storage::get_default_storage_id();
const auto expected_storage_file = "test.testbag";

const std::string bagfile = "rosbag2_bagfile_information:\n"
" version: 6\n"
" storage_identifier: " + expected_storage_id + "\n"
Expand Down Expand Up @@ -131,7 +133,7 @@ TEST_F(TemporaryDirectoryFixture, read_metadata_supports_version_6) {
" compression_format: \"zstd\"\n"
" compression_mode: \"FILE\"\n"
" files:\n"
" - path: test.db3\n"
" - path: " + expected_storage_file + "\n"
" starting_time:\n"
" nanoseconds_since_epoch: 0\n"
" duration:\n"
Expand Down Expand Up @@ -176,7 +178,7 @@ TEST_F(TemporaryDirectoryFixture, read_metadata_supports_version_6) {

{
EXPECT_EQ(metadata.files.size(), 1U);
EXPECT_EQ(metadata.files[0].path, "test.db3");
EXPECT_EQ(metadata.files[0].path, expected_storage_file);
EXPECT_EQ(metadata.files[0].starting_time.time_since_epoch().count(), 0U);
EXPECT_EQ(metadata.files[0].duration, std::chrono::nanoseconds{100000000});
EXPECT_EQ(metadata.files[0].message_count, 200U);
Expand Down Expand Up @@ -282,19 +284,24 @@ TEST_F(TemporaryDirectoryFixture, read_metadata_makes_appropriate_call_to_metada

TEST_F(TemporaryDirectoryFixture, info_for_standalone_bagfile) {
const auto bag_path = rcpputils::fs::path(temporary_dir_path_) / "bag";
const auto expected_bagfile_path = bag_path / "bag_0.db3";
{
// Create an empty bag with default storage
rosbag2_cpp::Writer writer;
writer.open(bag_path.string());
test_msgs::msg::BasicTypes msg;
writer.write(msg, "testtopic", rclcpp::Time{});
}
std::string first_storage_file_path;
{
rosbag2_storage::MetadataIo metadata_io;
auto metadata = metadata_io.read_metadata(bag_path.string());
first_storage_file_path = (bag_path / metadata.relative_file_paths[0]).string();
}

rosbag2_cpp::Info info;
rosbag2_storage::BagMetadata metadata;
EXPECT_NO_THROW(
metadata = info.read_metadata(expected_bagfile_path.string())
metadata = info.read_metadata(first_storage_file_path)
);
EXPECT_THAT(metadata.topics_with_message_count, SizeIs(1));
}
6 changes: 4 additions & 2 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ TEST_F(SequentialReaderTest, next_file_calls_callback) {

TEST_F(TemporaryDirectoryFixture, reader_accepts_bare_file) {
const auto bag_path = rcpputils::fs::path(temporary_dir_path_) / "bag";
const auto expected_bagfile_path = bag_path / "bag_0.db3";

{
// Create an empty bag with default storage
Expand All @@ -232,9 +231,12 @@ TEST_F(TemporaryDirectoryFixture, reader_accepts_bare_file) {
test_msgs::msg::BasicTypes msg;
writer.write(msg, "testtopic", rclcpp::Time{});
}
rosbag2_storage::MetadataIo metadata_io;
auto metadata = metadata_io.read_metadata(bag_path.string());
auto first_storage = bag_path / metadata.relative_file_paths[0];

rosbag2_cpp::Reader reader;
EXPECT_NO_THROW(reader.open(expected_bagfile_path.string()));
EXPECT_NO_THROW(reader.open(first_storage.string()));
EXPECT_TRUE(reader.has_next());
EXPECT_THAT(reader.get_metadata().topics_with_message_count, SizeIs(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ To run test benchmark (with `test.yaml` and `mixed_110Mbs.yaml`):
ros2 launch rosbag2_performance_benchmarking benchmark_launch.py benchmark:=`ros2 pkg prefix rosbag2_performance_benchmarking`/share/rosbag2_performance_benchmarking/config/benchmarks/test.yaml producers:=`ros2 pkg prefix rosbag2_performance_benchmarking`/share/rosbag2_performance_benchmarking/config/producers/mixed_110Mbs.yaml
```

The summary of benchmark goes into result file described in benchmark config: `<db_root_folder>/<BENCHMARK_NAME>/summary_result_file` where `BENCHMARK_NAME` is a name generated from config names, transport type and timestamp.
The summary of benchmark goes into result file described in benchmark config: `<bag_root_folder>/<BENCHMARK_NAME>/summary_result_file` where `BENCHMARK_NAME` is a name generated from config names, transport type and timestamp.

For human friendly output, a postprocess report generation tool can be used. Launch it with benchmark result directory as an `-i` argument (directory with `summary_result_file` file):

Expand Down Expand Up @@ -70,7 +70,7 @@ For more sophisticated & accurate benchmarks, see the `fio` command. An example

Tools that can help in I/O profiling: `sudo apt-get install iotop ioping sysstat`
* `iotop` works similar as `top` command, but shows disk reads, writes, swaps and I/O %. Can be used at higher frequency in batch mode with specified process to deliver data that can be plotted.
* Example use: `sudo iotop -h -d 0.1 -t -b -o -p <PID>` after running the bag.
* Example use: `sudo iotop -h -d 0.1 -t -b -o -p <PID>` after running the bag.
* `ioping` can be used to check latency of requests to device
* `strace` can help determine syscalls associated with the bottleneck.
* Example use: `strace -c ros2 bag record /image --max-cache-size 10 -o ./tmp`. You will see a report after finishing recording with Ctrl-C.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ rosbag2_performance_benchmarking:
ros__parameters:
benchmark:
summary_result_file: "results.csv"
db_root_folder: "rosbag2_performance_test_results"
bag_root_folder: "rosbag2_performance_test_results"
repeat_each: 2 # How many times to run each configurations (to average results)
no_transport: True # Whether to run storage-only or end-to-end (including transport) benchmark
preserve_bags: False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _copy_config_files():
"""Copy benchmark and producers config files to benchmark folder."""
global _bench_cfg_path, _producers_cfg_path
# Copy yaml configs for current benchmark after benchmark is finished
benchmark_path = pathlib.Path(_producer_nodes[0]['parameters']['db_folder'])
benchmark_path = pathlib.Path(_producer_nodes[0]['parameters']['bag_folder'])
shutil.copy(str(_bench_cfg_path), str(benchmark_path.with_name('benchmark.yaml')))
shutil.copy(str(_producers_cfg_path), str(benchmark_path.with_name('producers.yaml')))

Expand Down Expand Up @@ -196,15 +196,15 @@ def _producer_node_exited(event, context):

# Handle clearing bag files
if not node_params['preserve_bags']:
db_files = pathlib.Path.cwd().joinpath(node_params['db_folder']).glob('*.db3')
stats_path = pathlib.Path.cwd().joinpath(node_params['db_folder'], 'bagfiles_info.yaml')
bag_files = pathlib.Path.cwd().joinpath(node_params['bag_folder']).glob('*.db3')
stats_path = pathlib.Path.cwd().joinpath(node_params['bag_folder'], 'bagfiles_info.yaml')
stats = {
'total_size': 0,
'bagfiles': []
}

# Delete rosbag files
for f in db_files:
for f in bag_files:
filesize = f.stat().st_size
f.unlink()
stats['bagfiles'].append({f.name: {'size': filesize}})
Expand Down Expand Up @@ -261,7 +261,7 @@ def generate_launch_description():
benchmark_params = bench_cfg['benchmark']

repeat_each = benchmark_params.get('repeat_each')
db_root_folder = benchmark_params.get('db_root_folder')
bag_root_folder = benchmark_params.get('bag_root_folder')
summary_result_file = benchmark_params.get('summary_result_file')
transport = not benchmark_params.get('no_transport')
preserve_bags = benchmark_params.get('preserve_bags')
Expand Down Expand Up @@ -325,13 +325,13 @@ def __generate_cross_section_parameter(i,
)

# Result file path for producer
result_file = pathlib.Path(db_root_folder).joinpath(
result_file = pathlib.Path(bag_root_folder).joinpath(
benchmark_dir_name,
summary_result_file
)

# Database folder path for producer
db_folder = pathlib.Path(db_root_folder).joinpath(
# Bag folder path for producer
bag_folder = pathlib.Path(bag_root_folder).joinpath(
benchmark_dir_name,
node_title
)
Expand All @@ -340,7 +340,7 @@ def __generate_cross_section_parameter(i,
params_cross_section.append(
{
'node_title': node_title,
'db_folder': str(db_folder),
'bag_folder': str(bag_folder),
'cache': cache,
'preserve_bags': preserve_bags,
'transport': transport,
Expand Down Expand Up @@ -384,7 +384,7 @@ def __generate_cross_section_parameter(i,
producer_param['config_file'],
{'max_cache_size': producer_param['cache']},
{'max_bag_size': producer_param['max_bag_size']},
{'db_folder': producer_param['db_folder']},
{'bag_folder': producer_param['bag_folder']},
{'results_file': producer_param['result_file']},
{'compression_queue_size': producer_param['compression_queue_size']},
{'compression_threads': producer_param['compression_threads']}
Expand Down Expand Up @@ -448,7 +448,7 @@ def __generate_cross_section_parameter(i,
'-b',
str(producer_param['max_bag_size'])
]
rosbag_args += ['-o', str(producer_param['db_folder'])]
rosbag_args += ['-o', str(producer_param['bag_folder'])]
rosbag_process = launch.actions.ExecuteProcess(
sigkill_timeout=launch.substitutions.LaunchConfiguration(
'sigkill_timeout', default=60),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ BagConfig bag_config_from_node_parameters(
node.declare_parameter<std::string>("storage_id", rosbag2_storage::get_default_storage_id());
node.declare_parameter<int>("max_cache_size", 10000000);
node.declare_parameter<int>("max_bag_size", 0);
node.declare_parameter<std::string>("db_folder", default_bag_folder);
node.declare_parameter<std::string>("bag_folder", default_bag_folder);
node.declare_parameter<std::string>("storage_config_file", "");
node.declare_parameter<std::string>("compression_format", "");
node.declare_parameter<int>("compression_queue_size", 1);
Expand All @@ -122,7 +122,7 @@ BagConfig bag_config_from_node_parameters(
node.get_parameter("storage_id", bag_config.storage_options.storage_id);
node.get_parameter("max_cache_size", bag_config.storage_options.max_cache_size);
node.get_parameter("max_bag_size", bag_config.storage_options.max_bagfile_size);
node.get_parameter("db_folder", bag_config.storage_options.uri);
node.get_parameter("bag_folder", bag_config.storage_options.uri);
node.get_parameter("storage_config_file", bag_config.storage_options.storage_config_uri);
node.get_parameter("compression_format", bag_config.compression_format);
node.get_parameter("compression_queue_size", bag_config.compression_queue_size);
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_py/rosbag2_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
get_registered_readers,
)
from rosbag2_py._storage import (
BagMetadata,
ConverterOptions,
FileInformation,
MetadataIo,
ReadOrder,
ReadOrderSortBy,
StorageFilter,
StorageOptions,
TopicMetadata,
TopicInformation,
BagMetadata,
get_default_storage_id,
)
from rosbag2_py._writer import (
Expand All @@ -46,11 +47,11 @@
Info,
)
from rosbag2_py._transport import (
bag_rewrite,
Player,
PlayOptions,
Recorder,
RecordOptions,
bag_rewrite,
)
from rosbag2_py._reindexer import (
Reindexer
Expand All @@ -77,6 +78,7 @@
'TopicMetadata',
'TopicInformation',
'BagMetadata',
'MetadataIo',
'Info',
'Player',
'PlayOptions',
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage/default_storage_id.hpp"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/storage_interfaces/base_read_interface.hpp"
#include "rosbag2_storage/storage_options.hpp"
Expand Down Expand Up @@ -290,4 +291,13 @@ PYBIND11_MODULE(_storage, m) {
"get_default_storage_id",
&rosbag2_storage::get_default_storage_id,
"Returns the default storage ID used when unspecified in StorageOptions");

pybind11::class_<rosbag2_storage::MetadataIo>(m, "MetadataIo")
.def(pybind11::init<>())
.def("write_metadata", &rosbag2_storage::MetadataIo::write_metadata)
.def("read_metadata", &rosbag2_storage::MetadataIo::read_metadata)
.def("metadata_file_exists", &rosbag2_storage::MetadataIo::metadata_file_exists)
.def("serialize_metadata", &rosbag2_storage::MetadataIo::serialize_metadata)
.def("deserialize_metadata", &rosbag2_storage::MetadataIo::deserialize_metadata)
;
}
11 changes: 8 additions & 3 deletions rosbag2_py/test/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,12 @@ def test_record_cancel(tmp_path):

recorder.cancel()

metadata_path = Path(bag_path) / 'metadata.yaml'
db3_path = Path(bag_path) / 'test_record_cancel_0.db3'
assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(),
metadata_io = rosbag2_py.MetadataIo()
assert wait_for(lambda: metadata_io.metadata_file_exists(bag_path),
timeout=rclpy.duration.Duration(seconds=3))

metadata = metadata_io.read_metadata(bag_path)
assert(len(metadata.relative_file_paths))
storage_path = Path(metadata.relative_file_paths[0])
assert wait_for(lambda: storage_path.is_file(),
timeout=rclpy.duration.Duration(seconds=3))
Loading

0 comments on commit d7f882c

Please sign in to comment.