Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress bag files in separate threads #506

Merged
merged 25 commits into from
Dec 4, 2020

Conversation

pjreed
Copy link
Contributor

@pjreed pjreed commented Aug 14, 2020

This offloads all compression into separate threads. Doing so helps
to prevent rosbag2 from dropping messages due to the main thread
being busy, and it also helps to improve performance by spreading
the work across multiple CPU cores. It uses a producer/consumer
model with a fixed number of pre-allocated threads that consume
messages or files off of an incoming queue.

This adds two new command line options:

  • --compression-queue-size
    • The number of messages or files that can be sitting in the queue
      waiting for a thread to consume them. If the queue is full, older
      messages or files will be discarded; this would lead to either
      messages being discarded or files not being compressed.
      The default value is 1, which should be sufficient if the system
      is capable of keeping up with the work load.
  • --compression-threads
    • The number of threads that can be compressing data at once. The
      default is 0, and values less than 1 will be interpreted to mean
      the number of concurrent threads supported by the current
      hardware.

Closes #274.

Distribution Statement A; OPSEC #4584

Signed-off-by: P. J. Reed [email protected]

pjreed added 2 commits August 14, 2020 14:11
This offloads all compression into separate threads.  Doing so helps
to prevent rosbag2 from dropping messages due to the main thread
being busy, and it also helps to improve performance by spreading
the work across multiple CPU cores.  It uses a producer/consumer
model with a fixed number of pre-allocated threads that consume
messages or files off of an incoming queue.

This adds two new command line options:
- --compression-queue-size
   - The number of messages or files that can be sitting in the queue
     waiting for a thread to consume them.  If the queue is full, older
     messages or files will be discarded; this would lead to either
     messages being discarded or files not being compressed.
     The default value is 1, which should be sufficient if the system
     is capable of keeping up with the work load.
- --compression-threads
   - The number of threads that can be compressing data at once.  The
     default is 0, and values less than 1 will be interpreted to mean
     the number of concurrent threads supported by the current
     hardware.

Closes ros2#274.

Distribution Statement A; OPSEC #2893

Signed-off-by: P. J. Reed <[email protected]>
Distribution Statement A; OPSEC #2893

Signed-off-by: P. J. Reed <[email protected]>
Distribution Statement A; OPSEC #2893

Signed-off-by: P. J. Reed <[email protected]>
@emersonknapp
Copy link
Collaborator

High level - should the compression work queue be independently configurable from the writer queue? It would make sense to me that compression would work on the preexisting writer queue and would need to complete before submitting that queue to the storage impl.

@pjreed
Copy link
Contributor Author

pjreed commented Sep 9, 2020

High level - should the compression work queue be independently configurable from the writer queue?

Hmm. I don't think I have a strong opinion one way or another on this; I can't think of any situations where you would need to configure them to be different values, but I also don't think it hurts anything to keep them separate. Although, is the writer queue size currently configurable? I just looked over the list of command line arguments and don't see anything that obviously maps to that.

@pjreed
Copy link
Contributor Author

pjreed commented Sep 15, 2020

@emersonknapp Just poking this again, do you think anything else needs to be done?

@pjreed
Copy link
Contributor Author

pjreed commented Oct 5, 2020

Looks like another recent merge has introduce some conflicts with these changes, I'll see if I can get it sorted out...

@emersonknapp
Copy link
Collaborator

Sorry about that @pjreed i've been in and out of office - bad hand-off of responsibilities on my part. Change #526 went in to largely deduplicate the SequentialWriter vs SequentialCompressionWriter code - which will be better for all caching logic that goes in, since it was just written in two places before. That probably throws a wrench in this PR as it was written though. This could be good though to end up with something a little cleaner - the CompressionWriter can handle spinning up the thread and dispatching compression jobs to it, but not have to take care of just about anything else.

The new structure with get_writable_message is not asynchronous right now, though. Maybe that should be dispatched to a thread from the SequentialWriter side, then, in which case compression may not need to know it is running in a thread? Perhaps message converters would also benefit from being put off of the transport thread, though they are less overhead than compression.

@pjreed pjreed requested a review from a team as a code owner October 13, 2020 19:44
Signed-off-by: P. J. Reed <[email protected]>
@pjreed pjreed marked this pull request as draft October 13, 2020 20:08
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
@pjreed
Copy link
Contributor Author

pjreed commented Oct 14, 2020

@emersonknapp Ok, I think this is ready to go now.

I spent a little while thinking about it, and I'm not sure that it makes a lot of sense to make the base SequentialWriter multi-threaded. It doesn't really provide it any benefit, and if anything, the added contention from having multiple threads trying to write at once could actually decrease performance. Even for subclasses that use threads, the way they handle offloading work into threads can vary from case to case; in the SequentialCompressionWriter's case, for example, it's actually got two different ways that it can use threads, since in per-message mode every message is compressed and written in a separate thread, while in per-file mode, the method to write messages to files is still single-threaded, but files are handed over to the compressor threads when it checks to see when it's time to split bag files.

@pjreed pjreed marked this pull request as ready for review October 14, 2020 15:39
Also fix an issue that could cause compressor threads to
hang when exiting

Signed-off-by: P. J. Reed <[email protected]>
@pjreed pjreed marked this pull request as ready for review October 16, 2020 15:33
Copy link
Collaborator

@emersonknapp emersonknapp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM. Thanks for the performance benchmark investigation

Signed-off-by: P. J. Reed <[email protected]>
@emersonknapp
Copy link
Collaborator

emersonknapp commented Oct 26, 2020

Running this CI job
Gist: https://gist.githubusercontent.com/emersonknapp/86f84d73973ab681f5e23fe69e236da6/raw/e11f2391e32fb748418534862ada3b47c0871e82/ros2.repos
BUILD args: --packages-up-to ros2bag rosbag2_compression rosbag2_performance_writer_benchmarking rosbag2_transport rosbag2
TEST args: --packages-select ros2bag rosbag2_compression rosbag2_performance_writer_benchmarking rosbag2_transport
Job: ci_launcher

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
@pjreed
Copy link
Contributor Author

pjreed commented Nov 3, 2020

It looks like another merge caused a conflict, but it should be resolved now.

@pjreed
Copy link
Contributor Author

pjreed commented Nov 9, 2020

Just pinging @emersonknapp about this again.

@emersonknapp
Copy link
Collaborator

Left some comments on the discussion about write-order/determinism. Not blocking merging this, just adding context. I am going to try and resolve these merge conflicts now and see if I can get this merged this evening. Thanks for the patience and the important feature.

@emersonknapp
Copy link
Collaborator

Gist: https://gist.githubusercontent.com/emersonknapp/c901a3ec7225a38da9d08b156fca658e/raw/e9efac2d5f3ca1f383e469e9578f0b328a88a70e/ros2.repos
BUILD args: --packages-up-to ros2bag rosbag2_compression rosbag2_performance_writer_benchmarking rosbag2_transport rosbag2
TEST args: --packages-select ros2bag rosbag2_compression rosbag2_performance_writer_benchmarking rosbag2_transport rosbag2
Job: ci_launcher

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

@emersonknapp
Copy link
Collaborator

Rerun with OSX warning fix

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

@emersonknapp
Copy link
Collaborator

I think I've fixed the synchronization logic that the OSX test wast throwing on - we were not using the same mutex for calls to the condition variable, which is what you want to do if you are synchronizing access to a shared resource (in this case the message/file queue)

  • Linux Build Status
  • Linux-aarch64 Build Status
  • macOS Build Status
  • Windows Build Status

@emersonknapp emersonknapp merged commit 2d45abb into ros2:master Dec 4, 2020
compression_options);
} else {
writer_ = std::make_shared<rosbag2_cpp::writers::SequentialWriter>();
}
writer_ = std::make_shared<rosbag2_cpp::writers::SequentialWriter>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writer is overwritten here, meaning compression is never applied

@@ -46,6 +47,11 @@ class WriterBenchmark : public rclcpp::Node
std::vector<std::thread> producer_threads_;
std::vector<std::unique_ptr<ByteProducer>> producers_;
std::vector<std::shared_ptr<ByteMessageQueue>> queues_;

std::string compression_format_;
rosbag2_compression::CompressionMode compression_mode_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems unused

emersonknapp pushed a commit that referenced this pull request Feb 2, 2021
* Compress bag files in separate threads

This offloads all compression into separate threads.  Doing so helps
to prevent rosbag2 from dropping messages due to the main thread
being busy, and it also helps to improve performance by spreading
the work across multiple CPU cores.  It uses a producer/consumer
model with a fixed number of pre-allocated threads that consume
messages or files off of an incoming queue.

This adds two new command line options:
- --compression-queue-size
   - The number of messages or files that can be sitting in the queue
     waiting for a thread to consume them.  If the queue is full, older
     messages or files will be discarded; this would lead to either
     messages being discarded or files not being compressed.
     The default value is 1, which should be sufficient if the system
     is capable of keeping up with the work load.
- --compression-threads
   - The number of threads that can be compressing data at once.  The
     default is 0, and values less than 1 will be interpreted to mean
     the number of concurrent threads supported by the current
     hardware.

Closes #274.

Distribution Statement A; OPSEC #2893

Signed-off-by: P. J. Reed <[email protected]>
Co-authored-by: Emerson Knapp <[email protected]>
emersonknapp pushed a commit that referenced this pull request Feb 17, 2021
* Compress bag files in separate threads

This offloads all compression into separate threads.  Doing so helps
to prevent rosbag2 from dropping messages due to the main thread
being busy, and it also helps to improve performance by spreading
the work across multiple CPU cores.  It uses a producer/consumer
model with a fixed number of pre-allocated threads that consume
messages or files off of an incoming queue.

This adds two new command line options:
- --compression-queue-size
   - The number of messages or files that can be sitting in the queue
     waiting for a thread to consume them.  If the queue is full, older
     messages or files will be discarded; this would lead to either
     messages being discarded or files not being compressed.
     The default value is 1, which should be sufficient if the system
     is capable of keeping up with the work load.
- --compression-threads
   - The number of threads that can be compressing data at once.  The
     default is 0, and values less than 1 will be interpreted to mean
     the number of concurrent threads supported by the current
     hardware.

Closes #274.

Distribution Statement A; OPSEC #2893

Signed-off-by: P. J. Reed <[email protected]>
Co-authored-by: Emerson Knapp <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Compress files & messages aynchronously or in a separate thread
7 participants