From 1008b2efe8256fe95ae61ebba1ab673e0f397118 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 9 Jan 2018 17:23:05 -0800 Subject: [PATCH] override requiredOrdering for InsertIntoHiveTable --- .../hive/execution/InsertIntoHiveTable.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3ce5b8469d6fc..94b2bb365249c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils @@ -71,6 +71,24 @@ case class InsertIntoHiveTable( ifPartitionNotExists: Boolean, outputColumns: Seq[Attribute]) extends SaveAsHiveFile { + /** + * For partitioned tables, `requiredOrdering` is over partition columns of table + */ + override def requiredOrdering: Seq[Seq[SortOrder]] = { + if (table.partitionColumnNames.nonEmpty) { + val partitionAttributes = table.partitionColumnNames.map { name => + query.resolve(name :: Nil, SparkSession.getActiveSession.get.sessionState.analyzer.resolver) + .getOrElse { + throw new AnalysisException( + s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]") + }.asInstanceOf[Attribute] + } + Seq(partitionAttributes.map(SortOrder(_, Ascending))) + } else { + Seq.fill(children.size)(Nil) + } + } + /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the