Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9853][Core] Optimize shuffle fetch of continuous partition IDs #26040

Closed
wants to merge 10 commits into from

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Oct 7, 2019

This PR takes over #19788. After we split the shuffle fetch protocol from OpenBlock in #24565, this optimization can be extended in the new shuffle protocol. Credit to @yucai, closes #19788.

What changes were proposed in this pull request?

This PR adds the support for continuous shuffle block fetching in batch:

  • Shuffle client changes:
    • Add new feature tag spark.shuffle.fetchContinuousBlocksInBatch, implement the decision logic in BlockStoreShuffleReader.
    • Merge the continuous shuffle block ids in batch if needed in ShuffleBlockFetcherIterator.
  • Shuffle server changes:
    • Add support in ExternalBlockHandler for the external shuffle service side.
    • Make ShuffleBlockResolver.getBlockData accept getting block data by range.
  • Protocol changes:

Why are the changes needed?

In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example:

The shuffle block is stored like below:
image
The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala.

In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add new UT.
Integrate test with setting spark.sql.adaptive.enabled=true.

@xuanyuanking
Copy link
Member Author

The difference between this PR and the original one is mainly caused by the following proposals from my side:

  • Instead of considering merged batch fetch as several blocks, this approach treats the combined batch id as a single block, so that we can not only keep the interfaces of FetchResult with no change in all places, but also delete the duplicated code of merge continuous block id in BlockManager and ExternalShuffleBlockHandler.
  • Extend the newly added shuffle fetch protocol by reusing the reduceIds for ShuffleBlockBatchId, it keeps both start and end reduce id for the range [startReduceId, endReduceId).

@xuanyuanking
Copy link
Member Author

cc @yucai @cloud-fan @jiangxb1987

@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111831 has finished for PR 26040 at commit 413120f.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ShuffleBlockBatchId(

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111835 has finished for PR 26040 at commit 413120f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ShuffleBlockBatchId(

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111891 has finished for PR 26040 at commit 407b1e0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yucai
Copy link
Contributor

yucai commented Oct 9, 2019

@xuanyuanking @cloud-fan thanks for taking care of this!

startReduceId: Int,
endReduceId: Int) extends BlockId {
override def name: String = {
"shuffle_" + shuffleId + "_" + mapId + "_" + startReduceId + "_" + endReduceId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use "_" + startReduceId + "-" + endReduceId instead? Since startReduceId and endReduceId belongs to the same semantic group. This also gives more space to extend ShuffleBlockBatchId in the feature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format will also influence the protocol side. I suggest doing the change when it needed, within the same patch with extension.

if (curBlocks.isEmpty) {
curBlocks += info
} else {
if (curBlockId.mapId != curBlocks.head.blockId.asInstanceOf[ShuffleBlockId].mapId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we keep tracking with preMapId, startReduceId, endReduceId and mergedBlockSize and avoid using curBlocks: ArrayBuffer[FetchBlockInfo] since we don't need all the info in curBlocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes much difference, except using more vars.

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112148 has finished for PR 26040 at commit d9ea5af.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

val res = featureEnabled && fetchMultiPartitions &&
serializerRelocatable && (!compressed || codecConcatenation)
if (featureEnabled && !res) {
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
Copy link
Contributor

@cloud-fan cloud-fan Oct 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AQE is off by default, and the continuous fetch is on by default. This means we always do log. I think we should log only if the compressor/serializer is not suitable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. the code can be

val shouldBatchFetch = fetchMultiPartitions && context.getLocalProperties...
...
val doBatchFetch = ...
if (shouldBatchFetch && !doBatchFetch) ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, done in afd74e5.

@@ -35,11 +35,35 @@ private[spark] class BlockStoreShuffleReader[K, C](
readMetrics: ShuffleReadMetricsReporter,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
fetchMultiPartitions: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to reduce diff, how about we give it a default value true? Existing tests won't enable batch fetch as they don't set the local property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even remove this parameter. The caller side can drop the special local property if it doesn't need to fetch multiple partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, it's not good to handle the special local property in too many places. How about we read the local property in the caller side, check the # of partitions need to fetch, and pass a single "shouldBatchFetch" flag to BlockStoreShuffleReader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, pick the last approach, that's clearer, also move the property's key into SortShuffleManager.

@@ -355,6 +355,16 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)


val SHUFFLE_FETCH_CONTINUOUS_BLOCKS_IN_BATCH_ENABLED =
buildConf("spark.sql.adaptive.shuffle.fetchContinuousBlocksInBatch.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of creating the "shuffle" namespace, how about spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done in afd74e5.
There's already has spark.sql.adaptive.shuffle namespace.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a few code style comments

@SparkQA
Copy link

SparkQA commented Oct 16, 2019

Test build #112170 has finished for PR 26040 at commit fa7c272.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112196 has finished for PR 26040 at commit afd74e5.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2019

Test build #112197 has finished for PR 26040 at commit 8f855a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

I'm merging it. We can have a followup PR to address the last code style comment. Thanks for the work @yucai and @xuanyuanking !

@cloud-fan cloud-fan closed this in 239ee3f Oct 17, 2019
@xuanyuanking
Copy link
Member Author

Thanks for the review and help.

@yucai
Copy link
Contributor

yucai commented Oct 17, 2019

Thanks for solving this! @xuanyuanking @cloud-fan

cloud-fan pushed a commit that referenced this pull request Oct 18, 2019
…ns related to adaptive execution

### What changes were proposed in this pull request?
1. Regularize all the shuffle configurations related to adaptive execution.
2. Add default value for `BlockStoreShuffleReader.shouldBatchFetch`.

### Why are the changes needed?
It's a follow-up PR for #26040.
Regularize the existing `spark.sql.adaptive.shuffle` namespace in SQLConf.

### Does this PR introduce any user-facing change?
Rename one released user config `spark.sql.adaptive.minNumPostShufflePartitions` to `spark.sql.adaptive.shuffle.minNumPostShufflePartitions`, other changed configs is not released yet.

### How was this patch tested?
Existing UT.

Closes #26147 from xuanyuanking/SPARK-9853.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants