Skip to content

Commit

Permalink
[SPARK-23249][SQL] Improved block merging logic for partitions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346

## Rationale

The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board

Author: Glen Takahashi <[email protected]>

Closes #20372 from glentakahashi/feature/improved-block-merging.

(cherry picked from commit 8c21170)
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Glen Takahashi authored and cloud-fan committed Jan 31, 2018
1 parent 33f17b2 commit f5f21e8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,29 @@ case class FileSourceScanExec(
currentSize = 0
}

// Assign files to partitions using "Next Fit Decreasing"
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
def addFile(file: PartitionedFile): Unit = {
currentFiles += file
currentSize += file.length + openCostInBytes
}

var frontIndex = 0
var backIndex = splitFiles.length - 1

while (frontIndex <= backIndex) {
addFile(splitFiles(frontIndex))
frontIndex += 1
while (frontIndex <= backIndex &&
currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
addFile(splitFiles(frontIndex))
frontIndex += 1
}
while (backIndex > frontIndex &&
currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
addFile(splitFiles(backIndex))
backIndex -= 1
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
closePartition()
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
assert(partitions.size == 4, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
// Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
assert(partitions.size == 3, "when checking partitions")
assert(partitions(0).files.size == 2, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
assert(partitions(3).files.size == 1, "when checking partition 4")

// First partition reads (file1)
// First partition reads (file1, file6)
assert(partitions(0).files(0).start == 0)
assert(partitions(0).files(0).length == 2)
assert(partitions(0).files(1).start == 0)
assert(partitions(0).files(1).length == 1)

// Second partition reads (file2, file3)
assert(partitions(1).files(0).start == 0)
Expand All @@ -163,10 +164,6 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
assert(partitions(2).files(0).length == 1)
assert(partitions(2).files(1).start == 0)
assert(partitions(2).files(1).length == 1)

// Final partition reads (file6)
assert(partitions(3).files(0).start == 0)
assert(partitions(3).files(0).length == 1)
}

checkPartitionSchema(StructType(Nil))
Expand Down

0 comments on commit f5f21e8

Please sign in to comment.