From 85068277e94f613de4aee04088b810dacf7d00a5 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 26 Jan 2021 23:56:47 -0800 Subject: [PATCH] WIP more aggressive loggingto try and understand if this code is being run properly in the github test Signed-off-by: Emerson Knapp --- .../src/rosbag2_compression/logging.hpp | 4 ++-- .../sequential_compression_writer.cpp | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/rosbag2_compression/src/rosbag2_compression/logging.hpp b/rosbag2_compression/src/rosbag2_compression/logging.hpp index b02c7824a7..38ba538bc1 100644 --- a/rosbag2_compression/src/rosbag2_compression/logging.hpp +++ b/rosbag2_compression/src/rosbag2_compression/logging.hpp @@ -50,12 +50,12 @@ } while (0) #define ROSBAG2_COMPRESSION_LOG_DEBUG(...) \ - RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, __VA_ARGS__) + RCUTILS_LOG_INFO_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, __VA_ARGS__) #define ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(args) do { \ std::stringstream __ss; \ __ss << args; \ - RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, "%s", __ss.str().c_str()); \ + RCUTILS_LOG_INFO_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, "%s", __ss.str().c_str()); \ } while (0) #endif // ROSBAG2_COMPRESSION__LOGGING_HPP_ diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index 9b5d99cd36..14455850b3 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -64,22 +64,31 @@ SequentialCompressionWriter::~SequentialCompressionWriter() void SequentialCompressionWriter::compression_thread_fn() { // Every thread needs to have its own compression context for thread safety. + auto id = std::this_thread::get_id(); + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + id << ": Compressor thread starting"); auto compressor = compression_factory_->create_compressor( compression_options_.compression_format); if (!compressor) { + ROSBAG2_COMPRESSION_LOG_ERROR_STREAM( + id << ": Compressor thread couldn't create compressor...."); throw std::runtime_error{ "Cannot compress message; Writer is not open!"}; } while (true) { + ROSBAG2_COMPRESSION_LOG_ERROR_STREAM( + id << ": Compressor thread entered the loop"); std::shared_ptr message; std::string file; { // This mutex synchronizes both the condition and the running_ boolean, so it has to be // held when dealing with either/both std::unique_lock lock(compressor_queue_mutex_); + ROSBAG2_COMPRESSION_LOG_ERROR_STREAM( + id << ": Compressor thread has the lock and will now wait for the condition"); if (!compression_is_running_) { break; } @@ -89,6 +98,8 @@ void SequentialCompressionWriter::compression_thread_fn() compressor_message_queue_.pop(); } else if (!compressor_file_queue_.empty()) { file = compressor_file_queue_.front(); + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + id << ": Compressor thread got file " << file << " from the compression queue."); compressor_file_queue_.pop(); } } @@ -192,12 +203,14 @@ void SequentialCompressionWriter::reset() if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE && should_compress_last_file_) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Reset - should compress last bag, but need locks"); std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); try { storage_.reset(); // Storage must be closed before it can be compressed. if (!metadata_.relative_file_paths.empty()) { std::string file = metadata_.relative_file_paths.back(); + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Compressorfilequeuepush"); compressor_file_queue_.push(file); compressor_condition_.notify_one(); } @@ -246,6 +259,9 @@ void SequentialCompressionWriter::compress_file( if (file_relative_to_pwd.exists() && file_relative_to_pwd.file_size() > 0u) { const auto compressed_uri = compressor.compress_uri(file_relative_to_pwd.string()); const auto relative_compressed_uri = path(compressed_uri).filename(); + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "The compressed output file is " << compressed_uri << + " -- relative " << relative_compressed_uri); { // After we've compressed the file, replace the name in the file list with the new name. // Must search for the entry because other threads may have changed the order of the vector @@ -279,17 +295,22 @@ void SequentialCompressionWriter::compress_file( void SequentialCompressionWriter::split_bagfile() { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Entered split_bagfile"); std::lock_guard lock(storage_mutex_); std::lock_guard compressor_lock(compressor_queue_mutex_); // Grab last file before calling common splitting logic, which pushes the new filename const auto last_file = metadata_.relative_file_paths.back(); + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Calling parent split impl"); SequentialWriter::split_bagfile(); // If we're in FILE compression mode, push this file's name on to the queue so another // thread will handle compressing it. If not, we can just carry on. + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Ok - want to compress " << last_file); if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM( + "Bag split - adding " << last_file << " to compression queue"); compressor_file_queue_.push(last_file); compressor_condition_.notify_one(); } @@ -332,6 +353,7 @@ bool SequentialCompressionWriter::should_split_bagfile() if (storage_options_.max_bagfile_size == rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) { + ROSBAG2_COMPRESSION_LOG_WARN_STREAM("No splititng"); return false; } else { std::lock_guard lock(storage_mutex_);