Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for reading multiple sorted files per bucket #731

Merged
merged 17 commits into from
Feb 19, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,15 @@ object SQLConf {
.stringConf
.createWithDefault("")

val BUCKET_SORTED_SCAN_ENABLED =
buildConf("spark.sql.sources.bucketing.sortedScan.enabled")
.doc("When true, the bucketed table scan will respect table sort columns, " +
"and read multiple sorted files per bucket in a sort-merge way to preserve ordering. " +
"Note: tasks might have more memory footprint and OOM with vectorized reader, " +
"because multiple rows or columnar batches from different files will be read at same time.")
.booleanConf
.createWithDefault(false)

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
Expand Down Expand Up @@ -2192,6 +2201,8 @@ class SQLConf extends Serializable with Logging {
def setCommandRejectsSparkCoreConfs: Boolean =
getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS)

def bucketSortedScanEnabled: Boolean = getConf(SQLConf.BUCKET_SORTED_SCAN_ENABLED)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.sql.execution

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable.HashMap

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -121,6 +121,14 @@ case class RowDataSourceScanExec(
tableIdentifier = None)
}

sealed abstract class ScanMode

case object RegularMode extends ScanMode

case class SortedBucketMode(sortOrdering: Ordering[InternalRow]) extends ScanMode {
override def toString: String = "SortedBucketMode"
}

/**
* Physical plan node for scanning data from HadoopFsRelations.
*
Expand Down Expand Up @@ -148,6 +156,14 @@ case class FileSourceScanExec(
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}

private lazy val scanMode: ScanMode =
if (conf.bucketSortedScanEnabled && outputOrdering.nonEmpty && !singleFilePartitions) {
val sortOrdering = new LazilyGeneratedOrdering(outputOrdering, output)
SortedBucketMode(sortOrdering)
} else {
RegularMode
}

private lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
Expand Down Expand Up @@ -185,6 +201,13 @@ case class FileSourceScanExec(
ret
}

private lazy val singleFilePartitions: Boolean = {
val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
bucketToFilesGrouping.forall(p => p._2.length <= 1)
}

override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
Expand Down Expand Up @@ -225,15 +248,12 @@ case class FileSourceScanExec(
// In case of bucketing, its possible to have multiple files belonging to the
// same bucket in a given relation. Each of these files are locally sorted
// but those files combined together are not globally sorted. Given that,
// the RDD partition will not be sorted even if the relation has sort columns set
// Current solution is to check if all the buckets have a single file in it

val files = selectedPartitions.flatMap(partition => partition.files)
val bucketToFilesGrouping =
files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file))
val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1)

if (singleFilePartitions) {
// the RDD partition will not be sorted even if the relation has sort columns set.
//
// 1. With configuration "spark.sql.sources.bucketing.sortedScan.enabled" being enabled,
// output ordering is preserved by reading those sorted files in sort-merge way.
// 2. If not, output ordering is preserved if each bucket has no more than one file.
if (conf.bucketSortedScanEnabled || singleFilePartitions) {
// TODO Currently Spark does not support writing columns sorting in descending order
// so using Ascending order. This can be fixed in future
sortColumns.map(attribute => SortOrder(attribute, Ascending))
Expand Down Expand Up @@ -269,7 +289,8 @@ case class FileSourceScanExec(
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
"Location" -> locationDesc,
"ScanMode" -> scanMode.toString)
val withOptPartitionCount =
relation.partitionSchemaOption.map { _ =>
metadata + ("PartitionCount" -> selectedPartitions.size.toString)
Expand Down Expand Up @@ -408,7 +429,12 @@ case class FileSourceScanExec(
FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil))
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
scanMode match {
case SortedBucketMode(sortOrdering) =>
new FileSortedMergeScanRDD(fsRelation.sparkSession, readFile, filePartitions, sortOrdering)
case RegularMode =>
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
}
}

/**
Expand Down
Loading