-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19256][SQL] Hive bucketing support #19001
Changes from all commits
6ebd852
70feeed
669069c
7460770
3c367a0
d37eb8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,11 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage | |
import org.apache.spark.shuffle.FetchFailedException | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogUtils} | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils | ||
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, _} | ||
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning | ||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} | ||
import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} | ||
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} | ||
import org.apache.spark.sql.types.StringType | ||
import org.apache.spark.util.{SerializableConfiguration, Utils} | ||
|
||
|
@@ -109,7 +108,7 @@ object FileFormatWriter extends Logging { | |
outputSpec: OutputSpec, | ||
hadoopConf: Configuration, | ||
partitionColumns: Seq[Attribute], | ||
bucketSpec: Option[BucketSpec], | ||
bucketIdExpression: Option[Expression], | ||
statsTrackers: Seq[WriteJobStatsTracker], | ||
options: Map[String, String]) | ||
: Set[String] = { | ||
|
@@ -122,17 +121,6 @@ object FileFormatWriter extends Logging { | |
val partitionSet = AttributeSet(partitionColumns) | ||
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) | ||
|
||
val bucketIdExpression = bucketSpec.map { spec => | ||
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) | ||
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can | ||
// guarantee the data distribution is same between shuffle and bucketed data source, which | ||
// enables us to only shuffle one side when join a bucketed table and a normal one. | ||
HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression | ||
} | ||
val sortColumns = bucketSpec.toSeq.flatMap { | ||
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) | ||
} | ||
|
||
val caseInsensitiveOptions = CaseInsensitiveMap(options) | ||
|
||
// Note: prepareWrite has side effect. It sets "job". | ||
|
@@ -156,40 +144,14 @@ object FileFormatWriter extends Logging { | |
statsTrackers = statsTrackers | ||
) | ||
|
||
// We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we send an individual PR to do this? i.e. do the sorting via |
||
// the sort order doesn't matter | ||
val actualOrdering = plan.outputOrdering.map(_.child) | ||
val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { | ||
false | ||
} else { | ||
requiredOrdering.zip(actualOrdering).forall { | ||
case (requiredOrder, childOutputOrder) => | ||
requiredOrder.semanticEquals(childOutputOrder) | ||
} | ||
} | ||
|
||
SQLExecution.checkSQLExecutionId(sparkSession) | ||
|
||
// This call shouldn't be put into the `try` block below because it only initializes and | ||
// prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. | ||
committer.setupJob(job) | ||
|
||
try { | ||
val rdd = if (orderingMatched) { | ||
plan.execute() | ||
} else { | ||
// SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and | ||
// the physical plan may have different attribute ids due to optimizer removing some | ||
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. | ||
val orderingExpr = requiredOrdering | ||
.map(SortOrder(_, Ascending)) | ||
.map(BindReferences.bindReference(_, outputSpec.outputColumns)) | ||
SortExec( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing SortExec here and adding it in EnsureRequirements Strategy will have impact on many other DataWritingCommands which depends on FileFormatWriter, like CreateDataSourceTableAsSelectCommand. To fix it code changes are needed onto such DataWritingCommand implementations to export requiredDistribution and requiredOrdering. |
||
orderingExpr, | ||
global = false, | ||
child = plan).execute() | ||
} | ||
val rdd = plan.execute() | ||
val ret = new Array[WriteTaskResult](rdd.partitions.length) | ||
sparkSession.sparkContext.runJob( | ||
rdd, | ||
|
@@ -202,7 +164,7 @@ object FileFormatWriter extends Logging { | |
committer, | ||
iterator = iter) | ||
}, | ||
0 until rdd.partitions.length, | ||
rdd.partitions.indices, | ||
(index, res: WriteTaskResult) => { | ||
committer.onTaskCommit(res.commitMsg) | ||
ret(index) = res | ||
|
@@ -521,18 +483,18 @@ object FileFormatWriter extends Logging { | |
var recordsInFile: Long = 0L | ||
var fileCounter = 0 | ||
val updatedPartitions = mutable.Set[String]() | ||
var currentPartionValues: Option[UnsafeRow] = None | ||
var currentPartitionValues: Option[UnsafeRow] = None | ||
var currentBucketId: Option[Int] = None | ||
|
||
for (row <- iter) { | ||
val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(row)) else None | ||
val nextBucketId = if (isBucketed) Some(getBucketId(row)) else None | ||
|
||
if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) { | ||
if (currentPartitionValues != nextPartitionValues || currentBucketId != nextBucketId) { | ||
// See a new partition or bucket - write to a new partition dir (or a new bucket file). | ||
if (isPartitioned && currentPartionValues != nextPartitionValues) { | ||
currentPartionValues = Some(nextPartitionValues.get.copy()) | ||
statsTrackers.foreach(_.newPartition(currentPartionValues.get)) | ||
if (isPartitioned && currentPartitionValues != nextPartitionValues) { | ||
currentPartitionValues = Some(nextPartitionValues.get.copy()) | ||
statsTrackers.foreach(_.newPartition(currentPartitionValues.get)) | ||
} | ||
if (isBucketed) { | ||
currentBucketId = nextBucketId | ||
|
@@ -543,7 +505,7 @@ object FileFormatWriter extends Logging { | |
fileCounter = 0 | ||
|
||
releaseResources() | ||
newOutputWriter(currentPartionValues, currentBucketId, fileCounter, updatedPartitions) | ||
newOutputWriter(currentPartitionValues, currentBucketId, fileCounter, updatedPartitions) | ||
} else if (desc.maxRecordsPerFile > 0 && | ||
recordsInFile >= desc.maxRecordsPerFile) { | ||
// Exceeded the threshold in terms of the number of records per file. | ||
|
@@ -554,7 +516,7 @@ object FileFormatWriter extends Logging { | |
s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") | ||
|
||
releaseResources() | ||
newOutputWriter(currentPartionValues, currentBucketId, fileCounter, updatedPartitions) | ||
newOutputWriter(currentPartitionValues, currentBucketId, fileCounter, updatedPartitions) | ||
} | ||
val outputRow = getOutputRow(row) | ||
currentWriter.write(outputRow) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutedCommandExec
doesn't call it.