Skip to content

Commit

Permalink
Fix deadlock race condition on compression shutdown (#616)
Browse files Browse the repository at this point in the history
* Synchronize compression shutdown correctly, avoiding occasional deadlock

Signed-off-by: Emerson Knapp <[email protected]>
  • Loading branch information
emersonknapp authored Jan 27, 2021
1 parent 6e455c1 commit f0e5744
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::queue<std::string> compressor_file_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::vector<std::thread> compression_threads_;
std::atomic_bool compression_is_running_{false};
/* *INDENT-OFF* */ // uncrustify doesn't understand the macro + brace initializer
std::atomic_bool compression_is_running_
RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_) {false};
/* *INDENT-ON* */
std::recursive_mutex storage_mutex_;
std::condition_variable compressor_condition_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,16 @@ void SequentialCompressionWriter::compression_thread_fn()
"Cannot compress message; Writer is not open!"};
}

while (compression_is_running_) {
while (true) {
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
{
// This mutex synchronizes both the condition and the running_ boolean, so it has to be
// held when dealing with either/both
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
if (!compression_is_running_) {
break;
}
compressor_condition_.wait(lock);
if (!compressor_message_queue_.empty()) {
message = compressor_message_queue_.front();
Expand Down Expand Up @@ -137,7 +142,10 @@ void SequentialCompressionWriter::setup_compressor_threads()
ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(
"setup_compressor_threads: Starting " <<
compression_options_.compression_threads << " threads");
compression_is_running_ = true;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
compression_is_running_ = true;
}

// This function needs to throw an exception if the compression format is invalid, but because
// each thread creates its own compressor, we can't actually catch it here if one of the threads
Expand All @@ -159,7 +167,10 @@ void SequentialCompressionWriter::stop_compressor_threads()
{
if (!compression_threads_.empty()) {
ROSBAG2_COMPRESSION_LOG_DEBUG("Waiting for compressor threads to finish.");
compression_is_running_ = false;
{
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
compression_is_running_ = false;
}
compressor_condition_.notify_all();
for (auto & thread : compression_threads_) {
thread.join();
Expand Down

0 comments on commit f0e5744

Please sign in to comment.