Skip to content

Commit

Permalink
WIP more aggressive loggingto try and understand if this code is bein…
Browse files Browse the repository at this point in the history
…g run properly in the github test

Signed-off-by: Emerson Knapp <[email protected]>
  • Loading branch information
Emerson Knapp committed Jan 27, 2021
1 parent 028f859 commit 8506827
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
4 changes: 2 additions & 2 deletions rosbag2_compression/src/rosbag2_compression/logging.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
} while (0)

#define ROSBAG2_COMPRESSION_LOG_DEBUG(...) \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, __VA_ARGS__)
RCUTILS_LOG_INFO_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_COMPRESSION_LOG_DEBUG_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, "%s", __ss.str().c_str()); \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_COMPRESSION_PACKAGE_NAME, "%s", __ss.str().c_str()); \
} while (0)

#endif // ROSBAG2_COMPRESSION__LOGGING_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,31 @@ SequentialCompressionWriter::~SequentialCompressionWriter()
void SequentialCompressionWriter::compression_thread_fn()
{
// Every thread needs to have its own compression context for thread safety.
auto id = std::this_thread::get_id();
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
id << ": Compressor thread starting");
auto compressor = compression_factory_->create_compressor(
compression_options_.compression_format);


if (!compressor) {
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
id << ": Compressor thread couldn't create compressor....");
throw std::runtime_error{
"Cannot compress message; Writer is not open!"};
}

while (true) {
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
id << ": Compressor thread entered the loop");
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message;
std::string file;
{
// This mutex synchronizes both the condition and the running_ boolean, so it has to be
// held when dealing with either/both
std::unique_lock<std::mutex> lock(compressor_queue_mutex_);
ROSBAG2_COMPRESSION_LOG_ERROR_STREAM(
id << ": Compressor thread has the lock and will now wait for the condition");
if (!compression_is_running_) {
break;
}
Expand All @@ -89,6 +98,8 @@ void SequentialCompressionWriter::compression_thread_fn()
compressor_message_queue_.pop();
} else if (!compressor_file_queue_.empty()) {
file = compressor_file_queue_.front();
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
id << ": Compressor thread got file " << file << " from the compression queue.");
compressor_file_queue_.pop();
}
}
Expand Down Expand Up @@ -192,12 +203,14 @@ void SequentialCompressionWriter::reset()
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE &&
should_compress_last_file_)
{
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Reset - should compress last bag, but need locks");
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);
try {
storage_.reset(); // Storage must be closed before it can be compressed.
if (!metadata_.relative_file_paths.empty()) {
std::string file = metadata_.relative_file_paths.back();
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Compressorfilequeuepush");
compressor_file_queue_.push(file);
compressor_condition_.notify_one();
}
Expand Down Expand Up @@ -246,6 +259,9 @@ void SequentialCompressionWriter::compress_file(
if (file_relative_to_pwd.exists() && file_relative_to_pwd.file_size() > 0u) {
const auto compressed_uri = compressor.compress_uri(file_relative_to_pwd.string());
const auto relative_compressed_uri = path(compressed_uri).filename();
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"The compressed output file is " << compressed_uri <<
" -- relative " << relative_compressed_uri);
{
// After we've compressed the file, replace the name in the file list with the new name.
// Must search for the entry because other threads may have changed the order of the vector
Expand Down Expand Up @@ -279,17 +295,22 @@ void SequentialCompressionWriter::compress_file(

void SequentialCompressionWriter::split_bagfile()
{
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Entered split_bagfile");
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);

// Grab last file before calling common splitting logic, which pushes the new filename
const auto last_file = metadata_.relative_file_paths.back();

ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Calling parent split impl");
SequentialWriter::split_bagfile();

// If we're in FILE compression mode, push this file's name on to the queue so another
// thread will handle compressing it. If not, we can just carry on.
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Ok - want to compress " << last_file);
if (compression_options_.compression_mode == rosbag2_compression::CompressionMode::FILE) {
ROSBAG2_COMPRESSION_LOG_WARN_STREAM(
"Bag split - adding " << last_file << " to compression queue");
compressor_file_queue_.push(last_file);
compressor_condition_.notify_one();
}
Expand Down Expand Up @@ -332,6 +353,7 @@ bool SequentialCompressionWriter::should_split_bagfile()
if (storage_options_.max_bagfile_size ==
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("No splititng");
return false;
} else {
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
Expand Down

0 comments on commit 8506827

Please sign in to comment.