Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into 295/use-streaming…
Browse files Browse the repository at this point in the history
…-compression
  • Loading branch information
pjreed committed Jan 28, 2021
2 parents 0a4a228 + c4912cd commit 0fd280b
Show file tree
Hide file tree
Showing 53 changed files with 2,313 additions and 714 deletions.
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,18 @@ Below is an example profile set to the default ROS2 QoS settings.
reliability: reliable
durability: volatile
deadline:
sec: 2147483647 # LONG_MAX
nsec: 4294967295 # ULONG_MAX
# unspecified/infinity
sec: 0
nsec: 0
lifespan:
sec: 2147483647
nsec: 4294967295
# unspecified/infinity
sec: 0
nsec: 0
liveliness: system_default
liveliness_lease_duration:
sec: 2147483647
nsec: 4294967295
# unspecified/infinity
sec: 0
nsec: 0
avoid_ros_namespace_conventions: false
```
Expand Down
29 changes: 23 additions & 6 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ class RecordVerb(VerbExtension):
def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-a', '--all', action='store_true',
help='recording all topics, required if no topics are listed explicitly.')
help='recording all topics, required if no topics '
'are listed explicitly or through a regex')
parser.add_argument(
'topics', nargs='*', default=None, help='topics to be recorded')
parser.add_argument(
'-e', '--regex', default='', help='recording only topics '
'matching provided regular expression')
parser.add_argument(
'-x', '--exclude', default='', help='exclude topics '
'matching provided regular expression. Works with -a and -e, '
'subtracting excluded topics')
parser.add_argument(
'-o', '--output',
help='destination of the bagfile to create, \
Expand Down Expand Up @@ -122,11 +130,18 @@ def add_arguments(self, parser, cli_name): # noqa: D102

def main(self, *, args): # noqa: D102
# both all and topics cannot be true
if args.all and args.topics:
return print_error('Invalid choice: Can not specify topics and -a at the same time.')
# both all and topics cannot be false
if not(args.all or (args.topics and len(args.topics) > 0)):
return print_error('Invalid choice: Must specify topic(s) or -a')
if (args.all and (args.topics or args.regex)) or (args.topics and args.regex):
return print_error('Must specify only one option out of topics, --regex or --all')
# one out of "all", "topics" and "regex" must be true
if not(args.all or (args.topics and len(args.topics) > 0) or (args.regex)):
return print_error('Invalid choice: Must specify topic(s), --regex or --all')

if args.topics and args.exclude:
return print_error('--exclude argument cannot be used when specifying a list '
'of topics explicitly')

if args.exclude and not(args.regex or args.all):
return print_error('--exclude argument requires either --all or --regex')

uri = args.output or datetime.datetime.now().strftime('rosbag2_%Y_%m_%d-%H_%M_%S')

Expand Down Expand Up @@ -178,6 +193,8 @@ def main(self, *, args): # noqa: D102
max_bagfile_duration=args.max_bag_duration,
max_cache_size=args.max_cache_size,
topics=args.topics,
regex=args.regex,
exclude=args.exclude,
include_hidden_topics=args.include_hidden_topics,
qos_profile_overrides=qos_profile_overrides,
storage_preset_profile=args.storage_preset_profile,
Expand Down
4 changes: 2 additions & 2 deletions ros2bag/test/resources/empty_bag/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ rosbag2_bagfile_information:
name: /parameter_events
type: rcl_interfaces/msg/ParameterEvent
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 2147483647\n nsec: 4294967295\n lifespan:\n sec: 2147483647\n nsec: 4294967295\n liveliness: 1\n liveliness_lease_duration:\n sec: 2147483647\n nsec: 4294967295\n avoid_ros_namespace_conventions: false"
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 2\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 0\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
message_count: 0
- topic_metadata:
name: /rosout
type: rcl_interfaces/msg/Log
serialization_format: cdr
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 1\n deadline:\n sec: 2147483647\n nsec: 4294967295\n lifespan:\n sec: 10\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 2147483647\n nsec: 4294967295\n avoid_ros_namespace_conventions: false"
offered_qos_profiles: "- history: 3\n depth: 0\n reliability: 1\n durability: 1\n deadline:\n sec: 0\n nsec: 0\n lifespan:\n sec: 10\n nsec: 0\n liveliness: 1\n liveliness_lease_duration:\n sec: 0\n nsec: 0\n avoid_ros_namespace_conventions: false"
message_count: 0
compression_format: ""
compression_mode: ""
2 changes: 1 addition & 1 deletion ros2bag/test/resources/incomplete_qos_duration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
reliability: reliable
durability: volatile
deadline:
sec: 2147483647 # LONG_MAX
sec: 2
15 changes: 9 additions & 6 deletions ros2bag/test/resources/qos_profile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
reliability: reliable
durability: transient_local
deadline:
sec: 2147483647
nsec: 4294967295
# unspecified/infinity
sec: 0
nsec: 0
lifespan:
sec: 2147483647
nsec: 4294967295
# unspecified/infinity
sec: 0
nsec: 0
liveliness: automatic
liveliness_lease_duration:
sec: 2147483647
nsec: 4294967295
# unspecified/infinity
sec: 0
nsec: 0
avoid_ros_namespace_conventions: false
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,21 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader

protected:
/**
* Increment the current file iterator to point to the next file in the list of relative file
* paths.
*
* Expected usage:
* if (has_next_file()) load_next_file();
* Decompress the current bagfile so that it can be opened by the storage implementation.
*/
void load_next_file() override;
void preprocess_current_file() override;

private:
/**
* Initializes the decompressor if a compression mode is specified in the metadata.
*
* \throws std::invalid_argument If compression format doesn't exist.
* \throw std::invalid_argument If compression mode is NONE
* \throw std::invalid_argument If compression format could not be found
* \throw rcpputils::IllegalStateException if the decompressor could not be initialized for
* any other reason
*/
virtual void setup_decompression();
void setup_decompression();

private:
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
rosbag2_compression::CompressionMode compression_mode_{
rosbag2_compression::CompressionMode::NONE};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,17 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
/**
* Compress a file and update the metadata file path.
*
* Note: this may log an error without raising an exception in the case that the input file
* could not be deleted after compressing. This is an error and should never happen, but given
* that the desired output is created, execution will not be halted.
*
* \param compressor An initialized compression context.
* \param message The URI of the file to compress.
* \param file_relative_to_bag Relative path of the file to compress, as stored in metadata -
* meaning the path is relative to the bag base folder.
*/
virtual void compress_file(BaseCompressorInterface & compressor, const std::string & file);
virtual void compress_file(
BaseCompressorInterface & compressor,
const std::string & file_relative_to_bag);

/**
* Checks if the compression by message option is specified and a compressor exists.
Expand Down Expand Up @@ -166,7 +173,10 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::queue<std::string> compressor_file_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::vector<std::thread> compression_threads_;
std::atomic_bool compression_is_running_{false};
/* *INDENT-OFF* */ // uncrustify doesn't understand the macro + brace initializer
std::atomic_bool compression_is_running_
RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_) {false};
/* *INDENT-ON* */
std::recursive_mutex storage_mutex_;
std::condition_variable compressor_condition_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <utility>
#include <vector>

#include "rcpputils/asserts.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_compression/compression_options.hpp"
Expand All @@ -43,64 +44,63 @@ SequentialCompressionReader::~SequentialCompressionReader()

void SequentialCompressionReader::setup_decompression()
{
compression_mode_ = rosbag2_compression::compression_mode_from_string(metadata_.compression_mode);
if (compression_mode_ != rosbag2_compression::CompressionMode::NONE) {
decompressor_ = compression_factory_->create_decompressor(metadata_.compression_format);
// Decompress the first file so that it is readable; don't need to do anything for
// per-message encryption.
if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
if (decompressor_) {
return;
}

compression_mode_ = compression_mode_from_string(metadata_.compression_mode);
rcpputils::require_true(
compression_mode_ != rosbag2_compression::CompressionMode::NONE,
"SequentialCompressionReader should not be initialized with NONE compression mode.");

decompressor_ = compression_factory_->create_decompressor(metadata_.compression_format);
rcpputils::check_true(decompressor_ != nullptr, "Couldn't initialize decompressor.");
}

void SequentialCompressionReader::preprocess_current_file()
{
setup_decompression();

if (metadata_.version == 4) {
/*
* Rosbag2 was released with incorrect relative file naming for compressed bags
* which were written as v4, using v3 logic which had the bag name prefixed on the file path.
* Because we have no way to check whether the bag was written correctly,
* check for the existence of the prefixed file as a fallback.
*/
const rcpputils::fs::path base{base_folder_};
const rcpputils::fs::path relative{get_current_file()};
const auto resolved = base / relative;
if (!resolved.exists()) {
const auto base_stripped = relative.filename();
const auto resolved_stripped = base / base_stripped;
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(
"Unable to find specified bagfile " << resolved.string() <<
". Falling back to checking for " << resolved_stripped.string());
rcpputils::require_true(
resolved_stripped.exists(),
"Unable to resolve relative file path either as a V3 or V4 relative path");
*current_file_iterator_ = resolved_stripped.string();
}
} else {
throw std::invalid_argument{
"SequentialCompressionReader requires a CompressionMode that is not NONE!"};
}

if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_COMPRESSION_LOG_INFO_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
}

void SequentialCompressionReader::open(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
{
storage_options_ = storage_options;

if (metadata_io_->metadata_file_exists(storage_options.uri)) {
metadata_ = metadata_io_->read_metadata(storage_options.uri);
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_COMPRESSION_LOG_WARN("No file paths were found in metadata.");
return;
}
file_paths_ = metadata_.relative_file_paths;
current_file_iterator_ = file_paths_.begin();
setup_decompression();

storage_options_.uri = *current_file_iterator_;
storage_ = storage_factory_->open_read_only(storage_options);
if (!storage_) {
std::stringstream errmsg;
errmsg << "No storage could be initialized for: \"" <<
storage_options.uri << "\".";

throw std::runtime_error{errmsg.str()};
}
} else {
if (!metadata_io_->metadata_file_exists(storage_options.uri)) {
std::stringstream errmsg;
errmsg << "Could not find metadata for bag: \"" << storage_options.uri <<
"\". Legacy bag files are not supported if this is a ROS 1 bag file.";
"\". Bags without metadata (such as from ROS 1) not supported by rosbag2 decompression.";
throw std::runtime_error{errmsg.str()};
}
const auto & topics = metadata_.topics_with_message_count;
if (topics.empty()) {
ROSBAG2_COMPRESSION_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
check_converter_serialization_format(
converter_options.output_serialization_format,
topics[0].topic_metadata.serialization_format);
SequentialReader::open(storage_options, converter_options);
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionReader::read_next()
Expand All @@ -115,28 +115,4 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionRead
throw std::runtime_error{"Bag is not open. Call open() before reading."};
}


void SequentialCompressionReader::load_next_file()
{
if (current_file_iterator_ == file_paths_.end()) {
throw std::runtime_error{"Cannot load next file; already on last file!"};
}

if (compression_mode_ == rosbag2_compression::CompressionMode::NONE) {
throw std::runtime_error{"Cannot use SequentialCompressionReader with NONE compression mode."};
}

++current_file_iterator_;
if (compression_mode_ == rosbag2_compression::CompressionMode::FILE) {
if (decompressor_ == nullptr) {
throw std::runtime_error{
"The bag file was not properly opened. "
"Somehow the compression mode was set without opening a decompressor."
};
}

ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
}
} // namespace rosbag2_compression
Loading

0 comments on commit 0fd280b

Please sign in to comment.