diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index 3615755a89..ee4ff7479f 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -123,10 +123,17 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter /** * Compress a file and update the metadata file path. * + * Note: this may log an error without raising an exception in the case that the input file + * could not be deleted after compressing. This is an error and should never happen, but given + * that the desired output is created, execution will not be halted. + * * \param compressor An initialized compression context. - * \param message The URI of the file to compress. + * \param file_relative_to_bag Relative path of the file to compress, as stored in metadata - + * meaning the path is relative to the bag base folder. */ - virtual void compress_file(BaseCompressorInterface & compressor, const std::string & file); + virtual void compress_file( + BaseCompressorInterface & compressor, + const std::string & file_relative_to_bag); /** * Checks if the compression by message option is specified and a compressor exists. diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 1497edecb5..e1d16611d4 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -77,19 +77,24 @@ void SequentialCompressionWriter::compression_thread_fn() std::shared_ptr message; std::string file; { - // This mutex synchronizes both the condition and the running_ boolean, so it has to be - // held when dealing with either/both std::unique_lock lock(compressor_queue_mutex_); - if (!compression_is_running_) { - break; - } - compressor_condition_.wait(lock); + compressor_condition_.wait( + lock, + [&] { + return !compression_is_running_ || + !compressor_message_queue_.empty() || + !compressor_file_queue_.empty(); + }); + if (!compressor_message_queue_.empty()) { message = compressor_message_queue_.front(); compressor_message_queue_.pop(); } else if (!compressor_file_queue_.empty()) { file = compressor_file_queue_.front(); compressor_file_queue_.pop(); + } else if (!compression_is_running_) { + // I woke up, all work queues are empty, and the main thread has stopped execution. Exit. + break; } } @@ -111,11 +116,7 @@ void SequentialCompressionWriter::compression_thread_fn() void SequentialCompressionWriter::init_metadata() { std::lock_guard lock(storage_mutex_); - metadata_ = rosbag2_storage::BagMetadata{}; - metadata_.storage_identifier = storage_->get_storage_identifier(); - metadata_.starting_time = std::chrono::time_point{ - std::chrono::nanoseconds::max()}; - metadata_.relative_file_paths = {storage_->get_relative_file_path()}; + SequentialWriter::init_metadata(); metadata_.compression_format = compression_options_.compression_format; metadata_.compression_mode = rosbag2_compression::compression_mode_to_string(compression_options_.compression_mode); @@ -240,38 +241,43 @@ void SequentialCompressionWriter::remove_topic( void SequentialCompressionWriter::compress_file( BaseCompressorInterface & compressor, - const std::string & file) + const std::string & file_relative_to_bag) { - ROSBAG2_COMPRESSION_LOG_DEBUG("Compressing file: %s", file.c_str()); - const auto to_compress = rcpputils::fs::path{file}; + using rcpputils::fs::path; + + const auto file_relative_to_pwd = path(base_folder_) / file_relative_to_bag; + ROSBAG2_COMPRESSION_LOG_INFO_STREAM("Compressing file: " << file_relative_to_pwd.string()); - if (to_compress.exists() && to_compress.file_size() > 0u) { - const auto compressed_uri = compressor.compress_uri(to_compress.string()); + if (file_relative_to_pwd.exists() && file_relative_to_pwd.file_size() > 0u) { + const auto compressed_uri = compressor.compress_uri(file_relative_to_pwd.string()); + const auto relative_compressed_uri = path(compressed_uri).filename(); { // After we've compressed the file, replace the name in the file list with the new name. // Must search for the entry because other threads may have changed the order of the vector // and invalidated any index or iterator we held to it. std::lock_guard lock(storage_mutex_); - auto iter = std::find( + const auto iter = std::find( metadata_.relative_file_paths.begin(), metadata_.relative_file_paths.end(), - file); + file_relative_to_bag); if (iter != metadata_.relative_file_paths.end()) { - *iter = compressed_uri; + *iter = relative_compressed_uri.string(); } else { ROSBAG2_COMPRESSION_LOG_ERROR_STREAM( - "Failed to find path to uncompressed bag: \"" << file << + "Failed to find path to uncompressed bag: \"" << file_relative_to_pwd.string() << "\"; this shouldn't happen."); } } - if (!rcpputils::fs::remove(to_compress)) { + if (!rcpputils::fs::remove(file_relative_to_pwd)) { ROSBAG2_COMPRESSION_LOG_ERROR_STREAM( - "Failed to remove uncompressed bag: \"" << to_compress.string() << "\""); + "Failed to remove original pre-compressed bag file: \"" << + file_relative_to_pwd.string() << "\". This should never happen - but execution " << + "will not be halted because the compressed output was successfully created."); } } else { ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM( - "Removing last file: \"" << to_compress.string() << + "Removing last file: \"" << file_relative_to_pwd.string() << "\" because it either is empty or does not exist."); } } @@ -281,13 +287,14 @@ void SequentialCompressionWriter::split_bagfile() std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); - switch_to_next_storage(); + // Grab last file before calling common splitting logic, which pushes the new filename + const auto last_file = metadata_.relative_file_paths.back(); + SequentialWriter::split_bagfile(); // If we're in FILE compression mode, push this file's name on to the queue so another // thread will handle compressing it. If not, we can just carry on. if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { - std::string file = metadata_.relative_file_paths.back(); - compressor_file_queue_.push(file); + compressor_file_queue_.push(last_file); compressor_condition_.notify_one(); } @@ -296,8 +303,6 @@ void SequentialCompressionWriter::split_bagfile() // storage plugin. should_compress_last_file_ = false; } - - metadata_.relative_file_paths.push_back(storage_->get_relative_file_path()); } void SequentialCompressionWriter::compress_message( diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 60a14bbb2e..aa9aaeaed5 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -14,6 +14,7 @@ #include +#include #include #include #include @@ -50,11 +51,73 @@ class SequentialCompressionWriterTest : public Test serialization_format_{"rmw_format"} { tmp_dir_storage_options_.uri = tmp_dir_.string(); - rcpputils::fs::remove(tmp_dir_); + rcpputils::fs::remove_all(tmp_dir_); ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault(Return(storage_)); EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0)); + // intercept the metadata write so we can analyze it. + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + intercepted_metadata_ = metadata; + }); } + ~SequentialCompressionWriterTest() + { + rcpputils::fs::remove_all(tmp_dir_); + } + + void initializeFakeFileStorage() + { + // Create mock implementation of the storage, using files and a size of 1 per message + // initialize values when opening a new bagfile + ON_CALL(*storage_factory_, open_read_write(_)).WillByDefault( + DoAll( + Invoke( + [this](const rosbag2_storage::StorageOptions & storage_options) { + fake_storage_size_ = 0; + fake_storage_uri_ = storage_options.uri; + // Touch the file + std::ofstream output(storage_options.uri); + ASSERT_TRUE(output.is_open()); + // Put some arbitrary bytes in the file so it isn't interpreted as being empty + output << "Fake storage data" << std::endl; + output.close(); + }), + Return(storage_))); + ON_CALL( + *storage_, + write(An>())).WillByDefault( + [this](std::shared_ptr) { + fake_storage_size_ += 1; + }); + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_; + }); + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + } + + void initializeWriter( + const rosbag2_compression::CompressionOptions & compression_options, + std::unique_ptr custom_compression_factory = nullptr) + { + auto compression_factory = std::move(custom_compression_factory); + if (!compression_factory) { + compression_factory = std::make_unique(); + } + auto sequential_writer = std::make_unique( + compression_options, + std::move(compression_factory), + std::move(storage_factory_), + converter_factory_, + std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + } + + const std::string bag_name_ = "SequentialCompressionWriterTest"; std::unique_ptr> storage_factory_; std::shared_ptr> storage_; std::shared_ptr> converter_factory_; @@ -62,7 +125,10 @@ class SequentialCompressionWriterTest : public Test std::unique_ptr writer_; rcpputils::fs::path tmp_dir_; rosbag2_storage::StorageOptions tmp_dir_storage_options_; + rosbag2_storage::BagMetadata intercepted_metadata_; std::string serialization_format_; + uint64_t fake_storage_size_; + std::string fake_storage_uri_; const uint64_t kDefaultCompressionQueueSize = 1; const uint64_t kDefaultCompressionQueueThreads = 4; @@ -73,15 +139,7 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_empty_storage_options_uri rosbag2_compression::CompressionOptions compression_options{ "zstd", rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; - auto compression_factory = std::make_unique(); - - auto sequential_writer = std::make_unique( - compression_options, - std::move(compression_factory), - std::move(storage_factory_), - converter_factory_, - std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); + initializeWriter(compression_options); EXPECT_THROW( writer_->open( @@ -95,21 +153,11 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_bad_compression_format) rosbag2_compression::CompressionOptions compression_options{ "bad_format", rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; - auto compression_factory = std::make_unique(); - - auto sequential_writer = std::make_unique( - compression_options, - std::move(compression_factory), - std::move(storage_factory_), - converter_factory_, - std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); + initializeWriter(compression_options); EXPECT_THROW( writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}), std::invalid_argument); - - EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_)); } TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) @@ -117,7 +165,6 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) rosbag2_compression::CompressionOptions compression_options{ "zstd", rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; - auto compression_factory = std::make_unique(); // Set minimum file size greater than max bagfile size option const uint64_t min_split_file_size = 10; @@ -127,13 +174,7 @@ TEST_F(SequentialCompressionWriterTest, open_throws_on_invalid_splitting_size) storage_options.max_bagfile_size = max_bagfile_size; storage_options.uri = "foo.bar"; - auto sequential_writer = std::make_unique( - compression_options, - std::move(compression_factory), - std::move(storage_factory_), - converter_factory_, - std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); + initializeWriter(compression_options); EXPECT_THROW( writer_->open(storage_options, {serialization_format_, serialization_format_}), @@ -145,15 +186,7 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f rosbag2_compression::CompressionOptions compression_options{ "zstd", rosbag2_compression::CompressionMode::FILE, kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads}; - auto compression_factory = std::make_unique(); - - auto sequential_writer = std::make_unique( - compression_options, - std::move(compression_factory), - std::move(storage_factory_), - converter_factory_, - std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); + initializeWriter(compression_options); auto tmp_dir = rcpputils::fs::temp_directory_path() / "path_not_empty"; auto storage_options = rosbag2_storage::StorageOptions(); @@ -161,8 +194,6 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f EXPECT_NO_THROW( writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_})); - - EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_)); } TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) @@ -173,19 +204,55 @@ TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor) auto compression_factory = std::make_unique>(); EXPECT_CALL(*compression_factory, create_compressor(_)).Times(1); - auto sequential_writer = std::make_unique( - compression_options, - std::move(compression_factory), - std::move(storage_factory_), - converter_factory_, - std::move(metadata_io_)); - writer_ = std::make_unique(std::move(sequential_writer)); + initializeWriter(compression_options, std::move(compression_factory)); // This will throw an exception because the MockCompressionFactory does not actually create // a compressor. EXPECT_THROW( writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}), std::runtime_error); +} - EXPECT_TRUE(rcpputils::fs::remove(tmp_dir_)); +TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative_filepaths) +{ + // In this test, check that the SequentialCompressionWriter creates relative filepaths correctly + // Check both the first path, which is created in init_metadata, + // and subsequent paths, which are created in the splitting logic + const std::string test_topic_name = "test_topic"; + const std::string test_topic_type = "test_msgs/BasicTypes"; + rosbag2_compression::CompressionOptions compression_options { + "zstd", + rosbag2_compression::CompressionMode::FILE, + kDefaultCompressionQueueSize, + kDefaultCompressionQueueThreads + }; + + initializeFakeFileStorage(); + initializeWriter(compression_options); + + tmp_dir_storage_options_.max_bagfile_size = 1; + writer_->open(tmp_dir_storage_options_); + writer_->create_topic({test_topic_name, test_topic_type, "", ""}); + + auto message = std::make_shared(); + message->topic_name = test_topic_name; + + writer_->write(message); + // bag size == max_bafile_size, no split yet + writer_->write(message); + // bag size > max_bagfile_size, split + writer_->write(message); + writer_.reset(); + + EXPECT_EQ( + intercepted_metadata_.relative_file_paths.size(), 2u); + + const auto base_path = tmp_dir_storage_options_.uri; + int counter = 0; + for (const auto & path : intercepted_metadata_.relative_file_paths) { + std::stringstream ss; + ss << bag_name_ << "_" << counter << ".zstd"; + counter++; + EXPECT_EQ(ss.str(), path); + } } diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index a85ffd37aa..78140b82e1 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -536,7 +536,7 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress const auto metadata = metadata_io.read_metadata(root_bag_path_.string()); for (const auto & path : metadata.relative_file_paths) { - const auto file_path = rcpputils::fs::path{path}; + const auto file_path = root_bag_path_ / rcpputils::fs::path{path}; EXPECT_TRUE(file_path.exists()) << "File: \"" << file_path.string() << "\" does not exist!";