From 8c4f7949df6cc944f4be96d296a07cedceb18ab3 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 12 Aug 2017 21:05:25 +0800 Subject: [PATCH 1/2] Support writing data into hive bucket table. --- .../io/HadoopMapReduceCommitProtocol.scala | 33 +++++++++- .../catalog/ExternalCatalogUtils.scala | 4 +- .../sql/catalyst/catalog/interface.scala | 11 +++- .../plans/physical/partitioning.scala | 22 +++++-- .../sql/catalyst/DistributionSuite.scala | 25 ++++++++ .../sql/catalyst/PartitioningSuite.scala | 8 +++ .../sql/catalyst/trees/TreeNodeSuite.scala | 5 +- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../sql/execution/command/commands.scala | 8 +++ .../datasources/FileFormatWriter.scala | 60 ++++++++++++++++++- .../exchange/EnsureRequirements.scala | 3 +- .../execution/exchange/ShuffleExchange.scala | 2 +- .../joins/ReorderJoinPredicates.scala | 4 +- .../spark/sql/sources/BucketedReadSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 24 ++++++-- .../hive/execution/InsertIntoHiveTable.scala | 58 +++++++++--------- .../sql/hive/InsertIntoHiveTableSuite.scala | 49 ++++++++------- 17 files changed, 238 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 22e26799138ba..829d98c097d1d 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -57,6 +57,8 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) */ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) + private var fileNameWithPartitionId: Boolean = true + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. @@ -103,7 +105,11 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId - f"part-$split%05d-$jobId$ext" + if (fileNameWithPartitionId) { + f"part-$split%05d-$jobId$ext" + } else { + f"part-$jobId$ext" + } } override def setupJob(jobContext: JobContext): Unit = { @@ -118,6 +124,8 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) jobContext.getConfiguration.set("mapreduce.task.attempt.id", taskAttemptId.toString) jobContext.getConfiguration.setBoolean("mapreduce.task.ismap", true) jobContext.getConfiguration.setInt("mapreduce.task.partition", 0) + fileNameWithPartitionId = + jobContext.getConfiguration.getBoolean("spark.sql.bucket.fileNameWithPartitionId", true) val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId) committer = setupCommitter(taskAttemptContext) @@ -126,7 +134,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) - val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]) + val filesToMove = taskCommits.map(_.obj.asInstanceOf[HadoopMRTaskCommitStatus].absPathFiles) .foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) @@ -152,7 +160,24 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val attemptId = taskContext.getTaskAttemptID SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - new TaskCommitMessage(addedAbsPathFiles.toMap) + val p1 = committer match { + case fileOutputCommitter: FileOutputCommitter => + val committedPath = fileOutputCommitter.getCommittedTaskPath(taskContext) + if (committedPath != null) { + committedPath.toString + } else { + null + } + case _ => path + } + val p2 = if (path == null) { + null + } else { + absPathStagingDir.toString + } + + new TaskCommitMessage( + HadoopMRTaskCommitStatus(addedAbsPathFiles.toMap, Seq(p1, p2).filter(_ != null))) } override def abortTask(taskContext: TaskAttemptContext): Unit = { @@ -164,3 +189,5 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) } } } + +case class HadoopMRTaskCommitStatus(absPathFiles: Map[String, String], commitPaths: Seq[String]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 50f32e81d997d..fa164be1116a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -202,14 +202,14 @@ object CatalogUtils { tableCols: Seq[String], bucketSpec: BucketSpec, resolver: Resolver): BucketSpec = { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec + val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames, _) = bucketSpec val normalizedBucketCols = bucketColumnNames.map { colName => normalizeColumnName(tableName, tableCols, colName, "bucket", resolver) } val normalizedSortCols = sortColumnNames.map { colName => normalizeColumnName(tableName, tableCols, colName, "sort", resolver) } - BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols) + BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols, bucketSpec.isHiveBucket) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5a8c4e7610fff..643b41f2bb01f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -155,11 +155,13 @@ case class CatalogTablePartition( * @param numBuckets number of buckets. * @param bucketColumnNames the names of the columns that used to generate the bucket id. * @param sortColumnNames the names of the columns that used to sort data in each bucket. + * @param isHiveBucket if the spec is for Hive bucket table. */ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) { + sortColumnNames: Seq[String], + isHiveBucket: Boolean = false) { if (numBuckets <= 0 || numBuckets >= 100000) { throw new AnalysisException( s"Number of buckets should be greater than 0 but less than 100000. Got `$numBuckets`") @@ -172,7 +174,12 @@ case class BucketSpec( } else { "" } - s"$numBuckets buckets, $bucketString$sortString" + val isHiveBucketString = if (isHiveBucket) { + ", it is hive bucket." + } else { + ", it is not hive bucket." + } + s"$numBuckets buckets, $bucketString$sortString$isHiveBucketString" } def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 51d78dd1233fe..be9ed6b0514f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -49,7 +49,8 @@ case object AllTuples extends Distribution * can mean such tuples are either co-located in the same partition or they will be contiguous * within a single partition. */ -case class ClusteredDistribution(clustering: Seq[Expression]) extends Distribution { +case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None, + useHiveHash: Boolean = false) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + @@ -234,7 +235,8 @@ case object SinglePartition extends Partitioning { * of `expressions`. All rows where `expressions` evaluate to the same values are guaranteed to be * in the same partition. */ -case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) +case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int, + useHiveHash: Boolean = false) extends Expression with Partitioning with Unevaluable { override def children: Seq[Expression] = expressions @@ -243,7 +245,8 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def satisfies(required: Distribution): Boolean = required match { case UnspecifiedDistribution => true - case ClusteredDistribution(requiredClustering) => + case ClusteredDistribution(requiredClustering, clustersOpt, clusteredByHiveHash) + if (clustersOpt.forall(_ == numPartitions) && clusteredByHiveHash == useHiveHash) => expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) case _ => false } @@ -260,9 +263,15 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) /** * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less - * than numPartitions) based on hashing expressions. + * than numPartitions) based on hashing expressions. `HiveHash` will be returned when + * `useHiveHash` is true. This is for compatibility when insert data into Hive bucket table. */ - def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) + def partitionIdExpression: Expression = + if (useHiveHash) { + Pmod(new HiveHash(expressions), Literal(numPartitions)) + } else { + Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) + } } /** @@ -289,7 +298,8 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) case OrderedDistribution(requiredOrdering) => val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) - case ClusteredDistribution(requiredClustering) => + case ClusteredDistribution(requiredClustering, clustersOpt, _) + if clustersOpt.forall(_ == numPartitions) => ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala index b47b8adfe5d55..08cb62c43c4f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala @@ -79,6 +79,31 @@ class DistributionSuite extends SparkFunSuite { ClusteredDistribution(Seq('d, 'e)), false) + checkSatisfied( + HashPartitioning(Seq('a, 'b), 10), + ClusteredDistribution(Seq('a, 'b), Some(10)), + true) + + checkSatisfied( + HashPartitioning(Seq('a, 'b), 10), + ClusteredDistribution(Seq('a, 'b), Some(5)), + false) + + checkSatisfied( + HashPartitioning(Seq('a, 'b), 10, useHiveHash = true), + ClusteredDistribution(Seq('a, 'b), Some(10), useHiveHash = true), + true) + + checkSatisfied( + HashPartitioning(Seq('a, 'b), 10, useHiveHash = false), + ClusteredDistribution(Seq('a, 'b), Some(10), useHiveHash = true), + false) + + checkSatisfied( + HashPartitioning(Seq('a, 'b), 10, useHiveHash = true), + ClusteredDistribution(Seq('a, 'b), Some(10), useHiveHash = false), + false) + checkSatisfied( HashPartitioning(Seq('a, 'b, 'c), 10), AllTuples, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/PartitioningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/PartitioningSuite.scala index 5b802ccc637dd..b8db606009385 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/PartitioningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/PartitioningSuite.scala @@ -52,4 +52,12 @@ class PartitioningSuite extends SparkFunSuite { assert(partitioningA.guarantees(partitioningA)) assert(partitioningA.compatibleWith(partitioningA)) } + + test("HashPartitioning compatibility should be sensitive to whether Hive hash is used.") { + val expressions = Seq(Literal(2), Literal(3)) + val partitioningA = HashPartitioning(expressions, 100, useHiveHash = false) + val partitioningB = HashPartitioning(expressions, 100, useHiveHash = true) + assert(!partitioningA.compatibleWith(partitioningB)) + assert(!partitioningA.guarantees(partitioningB)) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 84d0ba7bef642..345b6d5632905 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -427,12 +427,13 @@ class TreeNodeSuite extends SparkFunSuite { // Converts BucketSpec to JSON assertJSON( - BucketSpec(1, Seq("bucket"), Seq("sort")), + BucketSpec(1, Seq("bucket"), Seq("sort"), true), JObject( "product-class" -> classOf[BucketSpec].getName, "numBuckets" -> 1, "bucketColumnNames" -> "[bucket]", - "sortColumnNames" -> "[sort]")) + "sortColumnNames" -> "[sort]", + "isHiveBucket" -> true)) // Converts WindowFrame to JSON assertJSON( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d4414b6f78ca2..1dee181da128d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1093,7 +1093,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil) val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) - val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) + val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec).map(_.copy(isHiveBucket = true)) // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 7cd4baef89e75..801571e448138 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.{logical, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.Distribution import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetric @@ -50,6 +51,8 @@ trait RunnableCommand extends logical.Command { def run(sparkSession: SparkSession): Seq[Row] = { throw new NotImplementedError } + + def requiredDestribution: Option[Seq[Distribution]] = None } /** @@ -97,6 +100,11 @@ case class ExecutedCommandExec(cmd: RunnableCommand, children: Seq[SparkPlan]) e protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult, 1) } + + /** Specifies any partition requirements on the input data for this operator. */ + override def requiredChildDistribution: Seq[Distribution] = { + cmd.requiredDestribution.getOrElse(super.requiredChildDistribution) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 514969715091a..3d121c9592fc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -22,14 +22,14 @@ import java.util.{Date, UUID} import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMRTaskCommitStatus, SparkHadoopWriterUtils} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession @@ -66,6 +66,7 @@ object FileFormatWriter extends Logging { val allColumns: Seq[Attribute], val dataColumns: Seq[Attribute], val partitionColumns: Seq[Attribute], + val numBuckets: Option[Int], val bucketIdExpression: Option[Expression], val path: String, val customPartitionLocations: Map[TablePartitionSpec, String], @@ -126,7 +127,7 @@ object FileFormatWriter extends Logging { // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can // guarantee the data distribution is same between shuffle and bucketed data source, which // enables us to only shuffle one side when join a bucketed table and a normal one. - HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression + HashPartitioning(bucketColumns, spec.numBuckets, spec.isHiveBucket).partitionIdExpression } val sortColumns = bucketSpec.toSeq.flatMap { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) @@ -145,6 +146,7 @@ object FileFormatWriter extends Logging { allColumns = allColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, + numBuckets = bucketSpec.map(_.numBuckets), bucketIdExpression = bucketIdExpression, path = outputSpec.outputPath, customPartitionLocations = outputSpec.customPartitionLocations, @@ -170,6 +172,12 @@ object FileFormatWriter extends Logging { SQLExecution.checkSQLExecutionId(sparkSession) + bucketSpec match { + case Some(spec) if spec.isHiveBucket => + job.getConfiguration.setBoolean("spark.sql.bucket.fileNameWithPartitionId", false) + case _ => // no-op + + } // This call shouldn't be put into the `try` block below because it only initializes and // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. committer.setupJob(job) @@ -184,6 +192,8 @@ object FileFormatWriter extends Logging { child = plan).execute() } val ret = new Array[WriteTaskResult](rdd.partitions.length) + val updatedPartitions = mutable.HashSet[String]() + val taskCommittedPaths = mutable.HashSet[String]() sparkSession.sparkContext.runJob( rdd, (taskContext: TaskContext, iter: Iterator[InternalRow]) => { @@ -198,9 +208,21 @@ object FileFormatWriter extends Logging { 0 until rdd.partitions.length, (index, res: WriteTaskResult) => { committer.onTaskCommit(res.commitMsg) + if (bucketSpec.nonEmpty && res.commitMsg.obj.isInstanceOf[HadoopMRTaskCommitStatus]) { + updatedPartitions ++= res.summary.updatedPartitions + taskCommittedPaths ++= + res.commitMsg.obj.asInstanceOf[HadoopMRTaskCommitStatus].commitPaths + } ret(index) = res }) + bucketSpec match { + case Some(spec) if spec.isHiveBucket => + // Create a new file for each Hive empty bucket. + fillWithEmptyBucketFiles(committer, caseInsensitiveOptions.get("jobId").get, + job, description, updatedPartitions.toSet, taskCommittedPaths.toSet) + case _ => // no-op + } val commitMsgs = ret.map(_.commitMsg) committer.commitJob(job, commitMsgs) @@ -279,6 +301,38 @@ object FileFormatWriter extends Logging { } } + private def fillWithEmptyBucketFiles(committer: FileCommitProtocol, + jobId: String, + job: Job, + desc: FileFormatWriter.WriteJobDescription, + updatedPartitions: Set[String], + baseDirs: Set[String]): Unit = { + val fileSystem = FileSystem.get(job.getConfiguration) + updatedPartitions.foreach { updatedPartition => + val bucketIds = baseDirs.flatMap { baseDir => + val partitionAbsDir = new Path(baseDir, updatedPartition) + if (fileSystem.exists(partitionAbsDir)) { + // Return bucket ids. + fileSystem.listStatus(partitionAbsDir) + .flatMap( status => BucketingUtils.getBucketId(status.getPath.getName)) + } else { + Nil + } + } + val missingBucketIds = (0 until desc.numBuckets.get).toSet -- bucketIds + missingBucketIds.foreach { bucketId => + baseDirs.find(dir => fileSystem.exists(new Path(dir, updatedPartition))) match { + case Some(dir) => + val partitionAbsDir = new Path(dir, updatedPartition) + val extension = s"${BucketingUtils.bucketIdToString(bucketId)}.c000" + FileSystem.get(job.getConfiguration).createNewFile( + new Path(partitionAbsDir, f"part-$jobId$extension")) + case None => // no-op + } + } + } + } + /** * For every registered [[WriteJobStatsTracker]], call `processStats()` on it, passing it * the corresponding [[WriteTaskStats]] from all executors. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b91d077442557..353270f056421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -50,7 +50,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { numPartitions: Int): Partitioning = { requiredDistribution match { case AllTuples => SinglePartition - case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions) + case ClusteredDistribution(clustering, clustersOpt, useHiveHash) => + HashPartitioning(clustering, clustersOpt.getOrElse(numPartitions), useHiveHash) case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions) case dist => sys.error(s"Do not know how to satisfy distribution $dist") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index eebe6ad2e7944..5d14ba2a8d093 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -203,7 +203,7 @@ object ShuffleExchange { serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) - case HashPartitioning(_, n) => + case HashPartitioning(_, n, _) => new Partitioner { override def numPartitions: Int = n // For HashPartitioning, the partitioning key is already a valid partition ID, as we use diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala index 534d8c5689c27..7ba01d59ab874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala @@ -54,13 +54,13 @@ class ReorderJoinPredicates extends Rule[SparkPlan] { if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) { leftPartitioning match { - case HashPartitioning(leftExpressions, _) + case HashPartitioning(leftExpressions, _, _) if leftExpressions.length == leftKeys.length && leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) => reorder(leftExpressions, leftKeys) case _ => rightPartitioning match { - case HashPartitioning(rightExpressions, _) + case HashPartitioning(rightExpressions, _, _) if rightExpressions.length == rightKeys.length && rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) => reorder(rightExpressions, rightKeys) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index eb9e6458fc61c..dc3b68db0c003 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -92,7 +92,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val strategy = DataSourceStrategy(spark.sessionState.conf) val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + val BucketSpec(numBuckets, bucketColumnNames, _, _) = bucketSpec // Limit: bucket pruning only works when the bucket column has one and only one column assert(bucketColumnNames.length == 1) val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e9d48f95aa905..cc9e0be6e7b86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -413,8 +413,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } if (bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get + val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames, isHiveBucket) = bucketSpec.get + properties.put(DATASOURCE_SCHEMA_ISHIVEBUCKET, isHiveBucket.toString) properties.put(DATASOURCE_SCHEMA_NUMBUCKETS, numBuckets.toString) properties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, bucketColumnNames.length.toString) bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) => @@ -737,9 +738,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } // Get the original table properties as defined by the user. - table.copy( + val ret = table.copy( createVersion = version, properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) + ret } // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition @@ -757,9 +759,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = { - val hiveTable = table.copy( - provider = Some(DDLUtils.HIVE_PROVIDER), - tracksPartitionsInCatalog = true) + val hiveTable = table.bucketSpec match { + case Some(spec) => + table.copy( + provider = Some(DDLUtils.HIVE_PROVIDER), + tracksPartitionsInCatalog = true, + bucketSpec = Some(spec.copy(isHiveBucket = true))) + case None => + table.copy( + provider = Some(DDLUtils.HIVE_PROVIDER), + tracksPartitionsInCatalog = true) + } // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. @@ -1196,6 +1206,7 @@ object HiveExternalCatalog { val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" + val DATASOURCE_SCHEMA_ISHIVEBUCKET = DATASOURCE_SCHEMA_PREFIX + "isHiveBucket" val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + "numBucketCols" val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part." @@ -1281,7 +1292,8 @@ object HiveExternalCatalog { BucketSpec( numBuckets.toInt, getColumnNamesByType(metadata.properties, "bucket", "bucketing columns"), - getColumnNamesByType(metadata.properties, "sort", "sorting columns")) + getColumnNamesByType(metadata.properties, "sort", "sorting columns"), + metadata.properties.get(DATASOURCE_SCHEMA_ISHIVEBUCKET).getOrElse("false") == "true") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 46610f84dd822..251f913f81241 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -35,8 +35,9 @@ import org.apache.spark.SparkException import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{CommandUtils, DataWritingCommand} import org.apache.spark.sql.execution.datasources.FileFormatWriter @@ -226,6 +227,27 @@ case class InsertIntoHiveTable( new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000 } + val partitionColumnNames = table.partitionColumnNames + + override def requiredDestribution: Option[Seq[Distribution]] = table.bucketSpec match { + case Some(bucketSpec) => + val numDynamicPartitions = partition.values.count(_.isEmpty) + val partitionAttributes = partitionColumnNames.map { name => + query.resolve(name :: Nil, conf.resolver).getOrElse { + throw new AnalysisException( + s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] + } + val allColumns = query.output + val partitionSet = AttributeSet(partitionAttributes) + val dataColumns = allColumns.filterNot(partitionSet.contains) + val bucketColumns = table.bucketSpec.get + .bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + Some(Seq(ClusteredDistribution(partitionAttributes ++ bucketColumns, + Option(bucketSpec.numBuckets), useHiveHash = true))) + case None => super.requiredDestribution + } + /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -279,16 +301,12 @@ case class InsertIntoHiveTable( case (key, None) => key -> "" } - // All partition column names in the format of "//..." - val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty) - // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( s"""Requested partitioning does not match the ${table.identifier.table} table: |Requested partitions: ${partition.keys.mkString(",")} - |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) + |Table partitions: ${partitionColumnNames.mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions @@ -311,30 +329,10 @@ case class InsertIntoHiveTable( } } - table.bucketSpec match { - case Some(bucketSpec) => - // Writes to bucketed hive tables are allowed only if user does not care about maintaining - // table's bucketing ie. both "hive.enforce.bucketing" and "hive.enforce.sorting" are - // set to false - val enforceBucketingConfig = "hive.enforce.bucketing" - val enforceSortingConfig = "hive.enforce.sorting" - - val message = s"Output Hive table ${table.identifier} is bucketed but Spark" + - "currently does NOT populate bucketed output which is compatible with Hive." - - if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean || - hadoopConf.get(enforceSortingConfig, "true").toBoolean) { - throw new AnalysisException(message) - } else { - logWarning(message + s" Inserting data anyways since both $enforceBucketingConfig and " + - s"$enforceSortingConfig are set to false.") - } - case _ => // do nothing since table has no bucketing - } - + val jobId = java.util.UUID.randomUUID().toString val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, - jobId = java.util.UUID.randomUUID().toString, + jobId = jobId, outputPath = tmpLocation.toString) val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name => @@ -352,9 +350,9 @@ case class InsertIntoHiveTable( outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty), hadoopConf = hadoopConf, partitionColumns = partitionAttributes, - bucketSpec = None, + bucketSpec = table.bucketSpec, statsTrackers = Seq(basicWriteJobStatsTracker(hadoopConf)), - options = Map.empty) + options = Map("jobId" -> jobId)) if (partition.nonEmpty) { if (numDynamicPartitions > 0) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index cc80f2e481cbf..94b4ee5d31d57 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -22,9 +22,8 @@ import java.io.File import org.scalatest.BeforeAndAfter import org.apache.spark.SparkException -import org.apache.spark.sql.{QueryTest, _} -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.HiveHashFunction import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -505,25 +504,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - testBucketedTable("INSERT should fail if strict bucketing / sorting is enforced") { - tableName => - withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "false") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" -> "true") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" -> "true") { - intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4") - } - } - } - test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") { // Set hive.exec.stagingdir under the table directory without start with ".". withSQLConf("hive.exec.stagingdir" -> "./test") { @@ -534,4 +514,29 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } } + + test("Insert data into hive bucketized table.") { + sql(""" + |CREATE TABLE bucketizedTable (key int, value string) + |CLUSTERED BY (key) SORTED BY (key ASC) into 4 buckets + |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' + |""".stripMargin) + val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketizedTable") + val data = spark.sparkContext.parallelize((0 until 100) + .map(i => TestData(i, i.toString))).toDF() + data.write.mode(SaveMode.Overwrite).insertInto("bucketizedTable") + val dir = spark.sessionState.catalog.defaultTablePath(identifier) + val bucketFiles = new File(dir).listFiles().sortWith((a: File, b: File) => { + a.getName < b.getName + }).filter(file => file.getName.startsWith("part-")) + assert(bucketFiles.length === 4) + (0 to 3).foreach { bucket => + spark.read.format("text") + .load(bucketFiles(bucket).getAbsolutePath) + .collect().map(_.getString(0).split("\t")(0).toInt) + .foreach { key => + assert(HiveHashFunction.hash(key, IntegerType, seed = 0) % 4 === bucket) + } + } + } } From cd2be585489c2c82acb8c88f5c9f713f15fb4d21 Mon Sep 17 00:00:00 2001 From: jinxing Date: Sat, 19 Aug 2017 23:19:56 +0800 Subject: [PATCH 2/2] fix --- .../io/HadoopMapReduceCommitProtocol.scala | 19 +++++++++---------- .../catalog/ExternalCatalogUtils.scala | 4 ++-- .../plans/physical/partitioning.scala | 3 ++- .../spark/sql/hive/HiveExternalCatalog.scala | 3 +-- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 829d98c097d1d..07bede7813e48 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -108,6 +108,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) if (fileNameWithPartitionId) { f"part-$split%05d-$jobId$ext" } else { + // File names created by different tasks should have different `ext` when `split` is not used. f"part-$jobId$ext" } } @@ -160,24 +161,22 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) val attemptId = taskContext.getTaskAttemptID SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) - val p1 = committer match { + val committedPaths = mutable.HashSet[String]() + committer match { case fileOutputCommitter: FileOutputCommitter => val committedPath = fileOutputCommitter.getCommittedTaskPath(taskContext) if (committedPath != null) { - committedPath.toString - } else { - null + committedPaths += committedPath.toString } - case _ => path + case _ => + committedPaths += path } - val p2 = if (path == null) { - null - } else { - absPathStagingDir.toString + if (path != null) { + committedPaths += absPathStagingDir.toString } new TaskCommitMessage( - HadoopMRTaskCommitStatus(addedAbsPathFiles.toMap, Seq(p1, p2).filter(_ != null))) + HadoopMRTaskCommitStatus(addedAbsPathFiles.toMap, committedPaths.toSeq)) } override def abortTask(taskContext: TaskAttemptContext): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index fa164be1116a7..1089b6652e835 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -202,14 +202,14 @@ object CatalogUtils { tableCols: Seq[String], bucketSpec: BucketSpec, resolver: Resolver): BucketSpec = { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames, _) = bucketSpec + val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames, isHiveBucket) = bucketSpec val normalizedBucketCols = bucketColumnNames.map { colName => normalizeColumnName(tableName, tableCols, colName, "bucket", resolver) } val normalizedSortCols = sortColumnNames.map { colName => normalizeColumnName(tableName, tableCols, colName, "sort", resolver) } - BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols, bucketSpec.isHiveBucket) + BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols, isHiveBucket) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index be9ed6b0514f7..46e5ea695aa39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -47,7 +47,8 @@ case object AllTuples extends Distribution * Represents data where tuples that share the same values for the `clustering` * [[Expression Expressions]] will be co-located. Based on the context, this * can mean such tuples are either co-located in the same partition or they will be contiguous - * within a single partition. + * within a single partition. `clusterOpt` indicates the numbers of partitions. `useHiveHash` + * tells if Hive hash should be used when do partitioning. */ case class ClusteredDistribution(clustering: Seq[Expression], clustersOpt: Option[Int] = None, useHiveHash: Boolean = false) extends Distribution { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index cc9e0be6e7b86..fe94238c7adc0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -738,10 +738,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } // Get the original table properties as defined by the user. - val ret = table.copy( + table.copy( createVersion = version, properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) - ret } // Reorder table schema to put partition columns at the end. Before Spark 2.2, the partition