-
What is your question? Is there anyone who can help explain the whole shuffle execution process (in particular the shuffle read) When i see the rapids code,i found the following scala files are related to shuffle (shuffle read), but i can not make it clear Thanks GpuPartitioning.scala |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments
-
@andygrove please correct me if I get anything wrong in relation to AQE. There are two different shuffle instances in the plugin. The first one is based off of Spark's sql shuffle. The SQL shuffle is in turn based off of the RDD shuffle. In the default RDD shuffle is the In the SQL shuffle for Spark this is all wrapped in/controlled by the The On the reading side a thread pool is launched to try and read in the different partitions. This glosses over the meta-data exchange that happens so the readers know where the data is. But the thread pool will read in different batches and place them in a queue to be consumed. It is rather complex because there is throttling involved to avoid DDOS attacks on the servers involved, and there is failure and retry logic on the shuffles as well. Essentially the data is read back in, deserialized, and sent to a Queue that an RDD wraps so the rest of the down stream processing can consume the data. When AQE is enabled a single shuffle may need to be modified. A regular shuffle might turn into a broadcast shuffle, of skewed joins might cause some parts of the shuffle to act like a broadcast while others act more like a regular shuffle. This is all handled by The GPU versions of all of these act in the same way.
The main differences that you have to be aware of are.
The second shuffle implementation is based off UCX and tries to avoid the CPU whenever possible. It replaces the default For the GPU data it bypasses the serialization, writing the data to disk multiple times, and the compression. Instead it will split the data up into We have recently started to add in GPU compression support for this too. We batch compress the outgoing buffers before registering them with a GPU cache manager, similar in concept to the CPU version in Spark, but implemented very differently. The main thing to be aware of here is that the GPU does compression/decompression well if there is a lot of data to process. It is not as good with small amounts of data. For this reason we have opted to not decompress the data until it gets to the first It is a bit ugly and could use some clean up especially because of the coupling between the different parts to work properly. I hope that this helps. At some point when the design is in less flux I need to write this up with diagrams and all so it can be a part of our documentation. |
Beta Was this translation helpful? Give feedback.
-
Looks good to me @revans2 |
Beta Was this translation helpful? Give feedback.
-
Looks good to me too @revans2 |
Beta Was this translation helpful? Give feedback.
-
Please reopen if you still have questions. |
Beta Was this translation helpful? Give feedback.
@andygrove please correct me if I get anything wrong in relation to AQE.
@abellina please correct anything I get wrong for the UCX based shuffle.
There are two different shuffle instances in the plugin.
The first one is based off of Spark's sql shuffle. The SQL shuffle is in turn based off of the RDD shuffle.
In the default RDD shuffle is the
SortShuffleManager
. For a sort the user is able to control how the data is serialized, if the data is sorted or not before it is shuffled, and how the data is partitioned. All of this is controlled by aShuffleDependency
.In the SQL shuffle for Spark this is all wrapped in/controlled by the
ShuffleExchangeExec
. It will perform the partitioning ahead …