From c575977a5952bf50b605be8079c9be1e30f3bd36 Mon Sep 17 00:00:00 2001 From: Glen Takahashi Date: Tue, 23 Jan 2018 18:22:34 -0500 Subject: [PATCH 1/4] Improved block merging logic for partitions --- .../sql/execution/DataSourceScanExec.scala | 25 +++++++++++++------ .../datasources/FileSourceStrategySuite.scala | 11 +++----- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 7c7d79c2bbd7c..3d58218eb44a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -445,16 +445,25 @@ case class FileSourceScanExec( currentSize = 0 } - // Assign files to partitions using "Next Fit Decreasing" - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { + def addFile(file: PartitionedFile): Unit = { + currentFiles += file + currentSize += file.length + openCostInBytes + } + + var frontIndex = 0 + var backIndex = splitFiles.length - 1 + + while (frontIndex <= backIndex) { + while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { + addFile(splitFiles(frontIndex)) + frontIndex += 1 + } + while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { + addFile(splitFiles(backIndex)) + backIndex -= 1 + } closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes - currentFiles += file } - closePartition() new FileScanRDD(fsRelation.sparkSession, readFile, partitions) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c1d61b843d899..4c9551060e0d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -142,15 +142,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] - assert(partitions.size == 4, "when checking partitions") - assert(partitions(0).files.size == 1, "when checking partition 1") + assert(partitions.size == 3, "when checking partitions") + assert(partitions(0).files.size == 2, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") assert(partitions(2).files.size == 2, "when checking partition 3") - assert(partitions(3).files.size == 1, "when checking partition 4") // First partition reads (file1) assert(partitions(0).files(0).start == 0) assert(partitions(0).files(0).length == 2) + assert(partitions(0).files(1).start == 0) + assert(partitions(0).files(1).length == 1) // Second partition reads (file2, file3) assert(partitions(1).files(0).start == 0) @@ -163,10 +164,6 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi assert(partitions(2).files(0).length == 1) assert(partitions(2).files(1).start == 0) assert(partitions(2).files(1).length == 1) - - // Final partition reads (file6) - assert(partitions(3).files(0).start == 0) - assert(partitions(3).files(0).length == 1) } checkPartitionSchema(StructType(Nil)) From af6bba6205330c4f803c964f4a476ffd074b30be Mon Sep 17 00:00:00 2001 From: Glen Takahashi Date: Tue, 23 Jan 2018 19:51:35 -0500 Subject: [PATCH 2/4] Update test comments --- .../sql/execution/datasources/FileSourceStrategySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4c9551060e0d5..bfccc9335b361 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -141,13 +141,13 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4", SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => - // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] + // Files should be laid out [(file1, file6), (file2, file3), (file4, file5)] assert(partitions.size == 3, "when checking partitions") assert(partitions(0).files.size == 2, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") assert(partitions(2).files.size == 2, "when checking partition 3") - // First partition reads (file1) + // First partition reads (file1, file6) assert(partitions(0).files(0).start == 0) assert(partitions(0).files(0).length == 2) assert(partitions(0).files(1).start == 0) From ef04de9766584b0a8ab13f290c9850e111144570 Mon Sep 17 00:00:00 2001 From: Glen Takahashi Date: Tue, 23 Jan 2018 19:56:41 -0500 Subject: [PATCH 3/4] always add current file in case it's not splittable and > maxSplitBytes --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 3d58218eb44a5..6007168d28820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -454,6 +454,8 @@ case class FileSourceScanExec( var backIndex = splitFiles.length - 1 while (frontIndex <= backIndex) { + addFile(splitFiles(frontIndex)) + frontIndex += 1 while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { addFile(splitFiles(frontIndex)) frontIndex += 1 From 57722cfaa035dc63da21c6bd442d995b8a0bcf0a Mon Sep 17 00:00:00 2001 From: Glen Takahashi Date: Wed, 24 Jan 2018 10:39:23 -0500 Subject: [PATCH 4/4] fix style --- .../sql/execution/DataSourceScanExec.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6007168d28820..aaceb5f157577 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -454,17 +454,19 @@ case class FileSourceScanExec( var backIndex = splitFiles.length - 1 while (frontIndex <= backIndex) { + addFile(splitFiles(frontIndex)) + frontIndex += 1 + while (frontIndex <= backIndex && + currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { addFile(splitFiles(frontIndex)) frontIndex += 1 - while (frontIndex <= backIndex && currentSize + splitFiles(frontIndex).length <= maxSplitBytes) { - addFile(splitFiles(frontIndex)) - frontIndex += 1 - } - while (backIndex > frontIndex && currentSize + splitFiles(backIndex).length <= maxSplitBytes) { - addFile(splitFiles(backIndex)) - backIndex -= 1 - } - closePartition() + } + while (backIndex > frontIndex && + currentSize + splitFiles(backIndex).length <= maxSplitBytes) { + addFile(splitFiles(backIndex)) + backIndex -= 1 + } + closePartition() } new FileScanRDD(fsRelation.sparkSession, readFile, partitions)