-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs #26040
Conversation
The difference between this PR and the original one is mainly caused by the following proposals from my side:
|
Test build #111831 has finished for PR 26040 at commit
|
retest this please |
Test build #111835 has finished for PR 26040 at commit
|
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
...ork-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
Test build #111891 has finished for PR 26040 at commit
|
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Show resolved
Hide resolved
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
Outdated
Show resolved
Hide resolved
@xuanyuanking @cloud-fan thanks for taking care of this! |
startReduceId: Int, | ||
endReduceId: Int) extends BlockId { | ||
override def name: String = { | ||
"shuffle_" + shuffleId + "_" + mapId + "_" + startReduceId + "_" + endReduceId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use "_" + startReduceId + "-" + endReduceId
instead? Since startReduceId
and endReduceId
belongs to the same semantic group. This also gives more space to extend ShuffleBlockBatchId in the feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format will also influence the protocol side. I suggest doing the change when it needed, within the same patch with extension.
if (curBlocks.isEmpty) { | ||
curBlocks += info | ||
} else { | ||
if (curBlockId.mapId != curBlocks.head.blockId.asInstanceOf[ShuffleBlockId].mapId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we keep tracking with preMapId
, startReduceId
, endReduceId
and mergedBlockSize
and avoid using curBlocks: ArrayBuffer[FetchBlockInfo]
since we don't need all the info in curBlocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes much difference, except using more vars.
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
Outdated
Show resolved
Hide resolved
...work-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FetchShuffleBlocks.java
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
Show resolved
Hide resolved
Test build #112148 has finished for PR 26040 at commit
|
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
Outdated
Show resolved
Hide resolved
val res = featureEnabled && fetchMultiPartitions && | ||
serializerRelocatable && (!compressed || codecConcatenation) | ||
if (featureEnabled && !res) { | ||
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AQE is off by default, and the continuous fetch is on by default. This means we always do log. I think we should log only if the compressor/serializer is not suitable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. the code can be
val shouldBatchFetch = fetchMultiPartitions && context.getLocalProperties...
...
val doBatchFetch = ...
if (shouldBatchFetch && !doBatchFetch) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, done in afd74e5.
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
Outdated
Show resolved
Hide resolved
@@ -35,11 +35,35 @@ private[spark] class BlockStoreShuffleReader[K, C]( | |||
readMetrics: ShuffleReadMetricsReporter, | |||
serializerManager: SerializerManager = SparkEnv.get.serializerManager, | |||
blockManager: BlockManager = SparkEnv.get.blockManager, | |||
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) | |||
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker, | |||
fetchMultiPartitions: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to reduce diff, how about we give it a default value true
? Existing tests won't enable batch fetch as they don't set the local property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can even remove this parameter. The caller side can drop the special local property if it doesn't need to fetch multiple partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, it's not good to handle the special local property in too many places. How about we read the local property in the caller side, check the # of partitions need to fetch, and pass a single "shouldBatchFetch" flag to BlockStoreShuffleReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, pick the last approach, that's clearer, also move the property's key into SortShuffleManager.
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
Show resolved
Hide resolved
@@ -355,6 +355,16 @@ object SQLConf { | |||
.bytesConf(ByteUnit.BYTE) | |||
.createWithDefault(64 * 1024 * 1024) | |||
|
|||
|
|||
val SHUFFLE_FETCH_CONTINUOUS_BLOCKS_IN_BATCH_ENABLED = | |||
buildConf("spark.sql.adaptive.shuffle.fetchContinuousBlocksInBatch.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of creating the "shuffle" namespace, how about spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, done in afd74e5.
There's already has spark.sql.adaptive.shuffle
namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few code style comments
Test build #112170 has finished for PR 26040 at commit
|
Test build #112196 has finished for PR 26040 at commit
|
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
Show resolved
Hide resolved
Test build #112197 has finished for PR 26040 at commit
|
I'm merging it. We can have a followup PR to address the last code style comment. Thanks for the work @yucai and @xuanyuanking ! |
Thanks for the review and help. |
Thanks for solving this! @xuanyuanking @cloud-fan |
…ns related to adaptive execution ### What changes were proposed in this pull request? 1. Regularize all the shuffle configurations related to adaptive execution. 2. Add default value for `BlockStoreShuffleReader.shouldBatchFetch`. ### Why are the changes needed? It's a follow-up PR for #26040. Regularize the existing `spark.sql.adaptive.shuffle` namespace in SQLConf. ### Does this PR introduce any user-facing change? Rename one released user config `spark.sql.adaptive.minNumPostShufflePartitions` to `spark.sql.adaptive.shuffle.minNumPostShufflePartitions`, other changed configs is not released yet. ### How was this patch tested? Existing UT. Closes #26147 from xuanyuanking/SPARK-9853. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
This PR takes over #19788. After we split the shuffle fetch protocol from
OpenBlock
in #24565, this optimization can be extended in the new shuffle protocol. Credit to @yucai, closes #19788.What changes were proposed in this pull request?
This PR adds the support for continuous shuffle block fetching in batch:
spark.shuffle.fetchContinuousBlocksInBatch
, implement the decision logic inBlockStoreShuffleReader
.ExternalBlockHandler
for the external shuffle service side.ShuffleBlockResolver.getBlockData
accept getting block data by range.ShuffleBlockBatchId
represent continuous shuffle block ids.FetchShuffleBlocks
andOneForOneBlockFetcher
.spark.shuffle.useOldFetchProtocol
.Why are the changes needed?
In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example:
The shuffle block is stored like below:

The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala.
In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add new UT.
Integrate test with setting
spark.sql.adaptive.enabled=true
.