-
Notifications
You must be signed in to change notification settings - Fork 261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compress bag files in separate threads #506
Conversation
This offloads all compression into separate threads. Doing so helps to prevent rosbag2 from dropping messages due to the main thread being busy, and it also helps to improve performance by spreading the work across multiple CPU cores. It uses a producer/consumer model with a fixed number of pre-allocated threads that consume messages or files off of an incoming queue. This adds two new command line options: - --compression-queue-size - The number of messages or files that can be sitting in the queue waiting for a thread to consume them. If the queue is full, older messages or files will be discarded; this would lead to either messages being discarded or files not being compressed. The default value is 1, which should be sufficient if the system is capable of keeping up with the work load. - --compression-threads - The number of threads that can be compressing data at once. The default is 0, and values less than 1 will be interpreted to mean the number of concurrent threads supported by the current hardware. Closes ros2#274. Distribution Statement A; OPSEC #2893 Signed-off-by: P. J. Reed <[email protected]>
Distribution Statement A; OPSEC #2893 Signed-off-by: P. J. Reed <[email protected]>
rosbag2_transport/src/rosbag2_transport/rosbag2_transport_python.cpp
Outdated
Show resolved
Hide resolved
rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp
Outdated
Show resolved
Hide resolved
rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
Show resolved
Hide resolved
Distribution Statement A; OPSEC #2893 Signed-off-by: P. J. Reed <[email protected]>
High level - should the compression work queue be independently configurable from the writer queue? It would make sense to me that compression would work on the preexisting writer queue and would need to complete before submitting that queue to the storage impl. |
Hmm. I don't think I have a strong opinion one way or another on this; I can't think of any situations where you would need to configure them to be different values, but I also don't think it hurts anything to keep them separate. Although, is the writer queue size currently configurable? I just looked over the list of command line arguments and don't see anything that obviously maps to that. |
@emersonknapp Just poking this again, do you think anything else needs to be done? |
Looks like another recent merge has introduce some conflicts with these changes, I'll see if I can get it sorted out... |
Sorry about that @pjreed i've been in and out of office - bad hand-off of responsibilities on my part. Change #526 went in to largely deduplicate the SequentialWriter vs SequentialCompressionWriter code - which will be better for all caching logic that goes in, since it was just written in two places before. That probably throws a wrench in this PR as it was written though. This could be good though to end up with something a little cleaner - the CompressionWriter can handle spinning up the thread and dispatching compression jobs to it, but not have to take care of just about anything else. The new structure with |
…ronously Signed-off-by: P. J. Reed <[email protected]>
…ronously Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
@emersonknapp Ok, I think this is ready to go now. I spent a little while thinking about it, and I'm not sure that it makes a lot of sense to make the base SequentialWriter multi-threaded. It doesn't really provide it any benefit, and if anything, the added contention from having multiple threads trying to write at once could actually decrease performance. Even for subclasses that use threads, the way they handle offloading work into threads can vary from case to case; in the SequentialCompressionWriter's case, for example, it's actually got two different ways that it can use threads, since in per-message mode every message is compressed and written in a separate thread, while in per-file mode, the method to write messages to files is still single-threaded, but files are handed over to the compressor threads when it checks to see when it's time to split bag files. |
Also fix an issue that could cause compressor threads to hang when exiting Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM. Thanks for the performance benchmark investigation
rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
Outdated
Show resolved
Hide resolved
rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp
Outdated
Show resolved
Hide resolved
Signed-off-by: P. J. Reed <[email protected]>
Running this CI job |
…ronously Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
Signed-off-by: P. J. Reed <[email protected]>
It looks like another merge caused a conflict, but it should be resolved now. |
Just pinging @emersonknapp about this again. |
rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp
Show resolved
Hide resolved
…ronously Signed-off-by: P. J. Reed <[email protected]>
Left some comments on the discussion about write-order/determinism. Not blocking merging this, just adding context. I am going to try and resolve these merge conflicts now and see if I can get this merged this evening. Thanks for the patience and the important feature. |
Signed-off-by: Emerson Knapp <[email protected]>
Gist: https://gist.githubusercontent.com/emersonknapp/c901a3ec7225a38da9d08b156fca658e/raw/e9efac2d5f3ca1f383e469e9578f0b328a88a70e/ros2.repos |
…ronously Signed-off-by: Emerson Knapp <[email protected]>
…ging Signed-off-by: Emerson Knapp <[email protected]>
…tion variable Signed-off-by: Emerson Knapp <[email protected]>
I think I've fixed the synchronization logic that the OSX test wast throwing on - we were not using the same mutex for calls to the condition variable, which is what you want to do if you are synchronizing access to a shared resource (in this case the message/file queue) |
compression_options); | ||
} else { | ||
writer_ = std::make_shared<rosbag2_cpp::writers::SequentialWriter>(); | ||
} | ||
writer_ = std::make_shared<rosbag2_cpp::writers::SequentialWriter>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writer is overwritten here, meaning compression is never applied
@@ -46,6 +47,11 @@ class WriterBenchmark : public rclcpp::Node | |||
std::vector<std::thread> producer_threads_; | |||
std::vector<std::unique_ptr<ByteProducer>> producers_; | |||
std::vector<std::shared_ptr<ByteMessageQueue>> queues_; | |||
|
|||
std::string compression_format_; | |||
rosbag2_compression::CompressionMode compression_mode_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems unused
* Compress bag files in separate threads This offloads all compression into separate threads. Doing so helps to prevent rosbag2 from dropping messages due to the main thread being busy, and it also helps to improve performance by spreading the work across multiple CPU cores. It uses a producer/consumer model with a fixed number of pre-allocated threads that consume messages or files off of an incoming queue. This adds two new command line options: - --compression-queue-size - The number of messages or files that can be sitting in the queue waiting for a thread to consume them. If the queue is full, older messages or files will be discarded; this would lead to either messages being discarded or files not being compressed. The default value is 1, which should be sufficient if the system is capable of keeping up with the work load. - --compression-threads - The number of threads that can be compressing data at once. The default is 0, and values less than 1 will be interpreted to mean the number of concurrent threads supported by the current hardware. Closes #274. Distribution Statement A; OPSEC #2893 Signed-off-by: P. J. Reed <[email protected]> Co-authored-by: Emerson Knapp <[email protected]>
* Compress bag files in separate threads This offloads all compression into separate threads. Doing so helps to prevent rosbag2 from dropping messages due to the main thread being busy, and it also helps to improve performance by spreading the work across multiple CPU cores. It uses a producer/consumer model with a fixed number of pre-allocated threads that consume messages or files off of an incoming queue. This adds two new command line options: - --compression-queue-size - The number of messages or files that can be sitting in the queue waiting for a thread to consume them. If the queue is full, older messages or files will be discarded; this would lead to either messages being discarded or files not being compressed. The default value is 1, which should be sufficient if the system is capable of keeping up with the work load. - --compression-threads - The number of threads that can be compressing data at once. The default is 0, and values less than 1 will be interpreted to mean the number of concurrent threads supported by the current hardware. Closes #274. Distribution Statement A; OPSEC #2893 Signed-off-by: P. J. Reed <[email protected]> Co-authored-by: Emerson Knapp <[email protected]>
This offloads all compression into separate threads. Doing so helps
to prevent rosbag2 from dropping messages due to the main thread
being busy, and it also helps to improve performance by spreading
the work across multiple CPU cores. It uses a producer/consumer
model with a fixed number of pre-allocated threads that consume
messages or files off of an incoming queue.
This adds two new command line options:
waiting for a thread to consume them. If the queue is full, older
messages or files will be discarded; this would lead to either
messages being discarded or files not being compressed.
The default value is 1, which should be sufficient if the system
is capable of keeping up with the work load.
default is 0, and values less than 1 will be interpreted to mean
the number of concurrent threads supported by the current
hardware.
Closes #274.
Distribution Statement A; OPSEC #4584
Signed-off-by: P. J. Reed [email protected]