From f0e5744dcd3b63355c2c96e940089fee551d2c16 Mon Sep 17 00:00:00 2001 From: Emerson Knapp <537409+emersonknapp@users.noreply.github.com> Date: Tue, 26 Jan 2021 17:05:09 -0800 Subject: [PATCH] Fix deadlock race condition on compression shutdown (#616) * Synchronize compression shutdown correctly, avoiding occasional deadlock Signed-off-by: Emerson Knapp --- .../sequential_compression_writer.hpp | 5 ++++- .../sequential_compression_writer.cpp | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp index d970457434..3615755a89 100644 --- a/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp +++ b/rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp @@ -166,7 +166,10 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_); std::queue compressor_file_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_); std::vector compression_threads_; - std::atomic_bool compression_is_running_{false}; + /* *INDENT-OFF* */ // uncrustify doesn't understand the macro + brace initializer + std::atomic_bool compression_is_running_ + RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_) {false}; + /* *INDENT-ON* */ std::recursive_mutex storage_mutex_; std::condition_variable compressor_condition_; diff --git a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp index f8b09ecd6d..1497edecb5 100644 --- a/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp +++ b/rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp @@ -73,11 +73,16 @@ void SequentialCompressionWriter::compression_thread_fn() "Cannot compress message; Writer is not open!"}; } - while (compression_is_running_) { + while (true) { std::shared_ptr message; std::string file; { + // This mutex synchronizes both the condition and the running_ boolean, so it has to be + // held when dealing with either/both std::unique_lock lock(compressor_queue_mutex_); + if (!compression_is_running_) { + break; + } compressor_condition_.wait(lock); if (!compressor_message_queue_.empty()) { message = compressor_message_queue_.front(); @@ -137,7 +142,10 @@ void SequentialCompressionWriter::setup_compressor_threads() ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM( "setup_compressor_threads: Starting " << compression_options_.compression_threads << " threads"); - compression_is_running_ = true; + { + std::unique_lock lock(compressor_queue_mutex_); + compression_is_running_ = true; + } // This function needs to throw an exception if the compression format is invalid, but because // each thread creates its own compressor, we can't actually catch it here if one of the threads @@ -159,7 +167,10 @@ void SequentialCompressionWriter::stop_compressor_threads() { if (!compression_threads_.empty()) { ROSBAG2_COMPRESSION_LOG_DEBUG("Waiting for compressor threads to finish."); - compression_is_running_ = false; + { + std::unique_lock lock(compressor_queue_mutex_); + compression_is_running_ = false; + } compressor_condition_.notify_all(); for (auto & thread : compression_threads_) { thread.join();