-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23249] [SQL] Improved block merging logic for partitions #20372
[SPARK-23249] [SQL] Improved block merging logic for partitions #20372
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technique seems good, but misses a corner case (large files) and some of the comments are now stale
@@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi | |||
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { | |||
checkScan(table.select('c1)) { partitions => | |||
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is stale now
assert(partitions(1).files.size == 2, "when checking partition 2") | ||
assert(partitions(2).files.size == 2, "when checking partition 3") | ||
assert(partitions(3).files.size == 1, "when checking partition 4") | ||
|
||
// First partition reads (file1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment is stale now
closePartition() | ||
} | ||
// Add the given file to the current partition. | ||
currentSize += file.length + openCostInBytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this handles files that are larger than the maxSplitBytes, which I think isn't done in the new algorithm. Need to make sure those don't form an infinite loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add a test for this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test still passes https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L101, and the splitFiles should have been already split by filesMaxPartitionBytes
I originally had similar functionality in, but I /think/ its redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, only if the file isSplittable
, so yea, will fail for non-splittable files. Will update code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saw you added a commit to handle the large non-splittable files case -- can you please add a test for that also?
want to make sure this while loop never becomes an infinite loop!
Jenkins, this is ok to test |
Test build #86567 has finished for PR 20372 at commit
|
Please fix the scala style checks --
and verify locally with |
The large non-splittable files is already tested by https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346 actually |
Test build #86591 has finished for PR 20372 at commit
|
Tagging folks who have touched this code recently: @vgankidi @ericl @davies This seems to provide a more compact packing in every scenario, which should improve execution times. One risk is that individual partitions are no longer always contiguous ranges of files in order, but rather sometimes they have a gap. In the test this is the |
I agree with @ash211. Applications shouldn't rely on the order of the files within a partition. |
please see https://spark.apache.org/contributing.html |
LGTM, also cc @hvanhovell @marmbrus |
please update your title to |
What are the remaining steps to get this merged? Just checking that I don't need to do anything else from my end. |
thanks, merging to master/2.3! |
## What changes were proposed in this pull request? Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions. ## How was this patch tested? Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3 Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346 ## Rationale The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes. As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2. More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition. This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD. Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board Author: Glen Takahashi <[email protected]> Closes #20372 from glentakahashi/feature/improved-block-merging. (cherry picked from commit 8c21170) Signed-off-by: Wenchen Fan <[email protected]>
We saw a performance regression in SPARK 2.3 about this change. Let me revert it now and please resubmit the PR with more reviews and performance testing. |
Reverted from 2.3 and master. |
@gatorsmile can you link the ticket about the perf regression? I imagine you would be seeing perf regressions in cases where partition counts are less than total cluster capacity, as this has the chance to reduce parallelism there. Also, is there a specific suite of perf-testing benchmarks that I should be running here that are used in testing releases? |
This PR was merged to RC3 of Spark 2.3. For all such fixes, we should not merge them to Spark 2.3. The performance regression has been witnessed in RC3, compared with RC2. We did not investigate more. For safety, we have to revert it from Spark 2.3 and master. Maybe you can submit a new PR and make it configurable? Does this sound good to you? |
No worries. Can you shed some more light onto the performance regressions? Are the benchmark code/results public for me to peruse? If not, could you post a high level summary? I'd love to know for our own benefit what cases/metrics you see regressions on. Regardless, I'll submit a new PR to make it configurable. |
It sounds like we fixed a "bug" and make the actual partition size more close to the expected one, but caused another "bug". 2 speculations:
|
What changes were proposed in this pull request?
Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions.
How was this patch tested?
Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346
Rationale
The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes.
As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2.
More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition.
This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD.
Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board