Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23249] [SQL] Improved block merging logic for partitions #20372

Conversation

glentakahashi
Copy link

@glentakahashi glentakahashi commented Jan 23, 2018

What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions.

How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346

Rationale

The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board

Copy link
Contributor

@ash211 ash211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technique seems good, but misses a corner case (large files) and some of the comments are now stale

@@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is stale now

assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
assert(partitions(3).files.size == 1, "when checking partition 4")

// First partition reads (file1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is stale now

closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this handles files that are larger than the maxSplitBytes, which I think isn't done in the new algorithm. Need to make sure those don't form an infinite loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add a test for this too

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test still passes https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101, and the splitFiles should have been already split by filesMaxPartitionBytes

I originally had similar functionality in, but I /think/ its redundant

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, only if the file isSplittable, so yea, will fail for non-splittable files. Will update code

Copy link
Contributor

@ash211 ash211 Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw you added a commit to handle the large non-splittable files case -- can you please add a test for that also?

ef04de9

want to make sure this while loop never becomes an infinite loop!

@ash211
Copy link
Contributor

ash211 commented Jan 24, 2018

Jenkins, this is ok to test

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86567 has finished for PR 20372 at commit ef04de9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Jan 24, 2018

Please fix the scala style checks --

Running Scala style checks
========================================================================
Scalastyle checks failed at following occurrences:
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:459: File line length exceeds 100 characters
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala:463: File line length exceeds 100 characters
[error] Total time: 14 s, completed Jan 23, 2018 10:44:36 PM
[error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/lint-scala ; received return code 1

and verify locally with ./dev/lint-scala

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86591 has finished for PR 20372 at commit 57722cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ash211
Copy link
Contributor

ash211 commented Jan 25, 2018

Tagging folks who have touched this code recently: @vgankidi @ericl @davies

This seems to provide a more compact packing in every scenario, which should improve execution times. One risk is that individual partitions are no longer always contiguous ranges of files in order, but rather sometimes they have a gap. In the test this is the (file1, file6) partition. If something depends on this past behavior it could now break, though I don't think anything should be requiring this partition ordering.

@vgankidi
Copy link

I agree with @ash211. Applications shouldn't rely on the order of the files within a partition.
This optimization looks good to me.

@felixcheung
Copy link
Member

please see https://spark.apache.org/contributing.html
open a JIRA and update this PR?

@glentakahashi glentakahashi changed the title Improved block merging logic for partitions [SPARK-23249] Improved block merging logic for partitions Jan 27, 2018
@glentakahashi
Copy link
Author

@felixcheung
Copy link
Member

@cloud-fan @gatorsmile

@cloud-fan
Copy link
Contributor

LGTM, also cc @hvanhovell @marmbrus

@cloud-fan
Copy link
Contributor

please update your title to [SPARK-23249][SQL] ...

@glentakahashi glentakahashi changed the title [SPARK-23249] Improved block merging logic for partitions [SPARK-23249] [SQL] Improved block merging logic for partitions Jan 29, 2018
@glentakahashi
Copy link
Author

What are the remaining steps to get this merged? Just checking that I don't need to do anything else from my end.

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

asfgit pushed a commit that referenced this pull request Jan 31, 2018
## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346

## Rationale

The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board

Author: Glen Takahashi <[email protected]>

Closes #20372 from glentakahashi/feature/improved-block-merging.

(cherry picked from commit 8c21170)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 8c21170 Jan 31, 2018
@glentakahashi glentakahashi deleted the feature/improved-block-merging branch January 31, 2018 18:03
@gatorsmile
Copy link
Member

gatorsmile commented Feb 14, 2018

We saw a performance regression in SPARK 2.3 about this change. Let me revert it now and please resubmit the PR with more reviews and performance testing.

@gatorsmile
Copy link
Member

Reverted from 2.3 and master.

@glentakahashi
Copy link
Author

@gatorsmile can you link the ticket about the perf regression? I imagine you would be seeing perf regressions in cases where partition counts are less than total cluster capacity, as this has the chance to reduce parallelism there.

Also, is there a specific suite of perf-testing benchmarks that I should be running here that are used in testing releases?

@gatorsmile
Copy link
Member

This PR was merged to RC3 of Spark 2.3. For all such fixes, we should not merge them to Spark 2.3. The performance regression has been witnessed in RC3, compared with RC2. We did not investigate more. For safety, we have to revert it from Spark 2.3 and master.

Maybe you can submit a new PR and make it configurable? Does this sound good to you?

@glentakahashi
Copy link
Author

No worries. Can you shed some more light onto the performance regressions? Are the benchmark code/results public for me to peruse? If not, could you post a high level summary? I'd love to know for our own benefit what cases/metrics you see regressions on.

Regardless, I'll submit a new PR to make it configurable.

@cloud-fan
Copy link
Contributor

It sounds like we fixed a "bug" and make the actual partition size more close to the expected one, but caused another "bug". 2 speculations:

  1. The expected partition size can't maximum read performace
  2. the open file cost is wrongly estimated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants