Skip to content

Commit

Permalink
Deduplicate sequentialcompressionreader business logic, add fallback …
Browse files Browse the repository at this point in the history
…to find bagfiles in incorrectly-written metadata

Signed-off-by: Emerson Knapp <[email protected]>
  • Loading branch information
Emerson Knapp committed Jan 23, 2021
1 parent 277e9cc commit d7e870d
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,18 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionReader

protected:
/**
* Increment the current file iterator to point to the next file in the list of relative file
* paths.
*
* Expected usage:
* if (has_next_file()) load_next_file();
* Decompress the current bagfile so that it can be opened by the storage implementation.
*/
void load_next_file() override;
void preprocess_current_file() override;

private:
/**
* Initializes the decompressor if a compression mode is specified in the metadata.
*
* \throws std::invalid_argument If compression format doesn't exist.
*/
virtual void setup_decompression();
void setup_decompression();

private:
std::unique_ptr<rosbag2_compression::BaseDecompressorInterface> decompressor_{};
rosbag2_compression::CompressionMode compression_mode_{
rosbag2_compression::CompressionMode::NONE};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,64 +43,42 @@ SequentialCompressionReader::~SequentialCompressionReader()

void SequentialCompressionReader::setup_decompression()
{
compression_mode_ = rosbag2_compression::compression_mode_from_string(metadata_.compression_mode);
if (decompressor_) {
return;
}

compression_mode_ = compression_mode_from_string(metadata_.compression_mode);
if (compression_mode_ != rosbag2_compression::CompressionMode::NONE) {
decompressor_ = compression_factory_->create_decompressor(metadata_.compression_format);
// Decompress the first file so that it is readable; don't need to do anything for
// per-message encryption.
if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
} else {
throw std::invalid_argument{
"SequentialCompressionReader requires a CompressionMode that is not NONE!"};
}
if (!decompressor_) {
throw std::runtime_error{"Couldn't initialize decompressor."};
}
}

void SequentialCompressionReader::preprocess_current_file()
{
setup_decompression();
if (compression_mode_ == CompressionMode::FILE) {
ROSBAG2_COMPRESSION_LOG_INFO_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
}

void SequentialCompressionReader::open(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options)
{
storage_options_ = storage_options;

if (metadata_io_->metadata_file_exists(storage_options.uri)) {
metadata_ = metadata_io_->read_metadata(storage_options.uri);
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_COMPRESSION_LOG_WARN("No file paths were found in metadata.");
return;
}
file_paths_ = metadata_.relative_file_paths;
current_file_iterator_ = file_paths_.begin();
setup_decompression();

storage_options_.uri = *current_file_iterator_;
storage_ = storage_factory_->open_read_only(storage_options);
if (!storage_) {
std::stringstream errmsg;
errmsg << "No storage could be initialized for: \"" <<
storage_options.uri << "\".";

throw std::runtime_error{errmsg.str()};
}
} else {
if (!metadata_io_->metadata_file_exists(storage_options.uri)) {
std::stringstream errmsg;
errmsg << "Could not find metadata for bag: \"" << storage_options.uri <<
"\". Legacy bag files are not supported if this is a ROS 1 bag file.";
"\". Bags without metadata (such as from ROS 1) not supported by rosbag2 decompression.";
throw std::runtime_error{errmsg.str()};
}
const auto & topics = metadata_.topics_with_message_count;
if (topics.empty()) {
ROSBAG2_COMPRESSION_LOG_WARN("No topics were listed in metadata.");
return;
}
fill_topics_metadata();

// Currently a bag file can only be played if all topics have the same serialization format.
check_topics_serialization_formats(topics);
check_converter_serialization_format(
converter_options.output_serialization_format,
topics[0].topic_metadata.serialization_format);
SequentialReader::open(storage_options, converter_options);
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionReader::read_next()
Expand All @@ -115,28 +93,4 @@ std::shared_ptr<rosbag2_storage::SerializedBagMessage> SequentialCompressionRead
throw std::runtime_error{"Bag is not open. Call open() before reading."};
}


void SequentialCompressionReader::load_next_file()
{
if (current_file_iterator_ == file_paths_.end()) {
throw std::runtime_error{"Cannot load next file; already on last file!"};
}

if (compression_mode_ == rosbag2_compression::CompressionMode::NONE) {
throw std::runtime_error{"Cannot use SequentialCompressionReader with NONE compression mode."};
}

++current_file_iterator_;
if (compression_mode_ == rosbag2_compression::CompressionMode::FILE) {
if (decompressor_ == nullptr) {
throw std::runtime_error{
"The bag file was not properly opened. "
"Somehow the compression mode was set without opening a decompressor."
};
}

ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM("Decompressing " << get_current_file().c_str());
*current_file_iterator_ = decompressor_->decompress_uri(get_current_file());
}
}
} // namespace rosbag2_compression
Loading

0 comments on commit d7e870d

Please sign in to comment.