Skip to content

Commit

Permalink
override requiredOrdering for InsertIntoHiveTable
Browse files Browse the repository at this point in the history
  • Loading branch information
tejasapatil committed Jan 10, 2018
1 parent 652dca2 commit 1008b2e
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
Expand Down Expand Up @@ -71,6 +71,24 @@ case class InsertIntoHiveTable(
ifPartitionNotExists: Boolean,
outputColumns: Seq[Attribute]) extends SaveAsHiveFile {

/**
* For partitioned tables, `requiredOrdering` is over partition columns of table
*/
override def requiredOrdering: Seq[Seq[SortOrder]] = {
if (table.partitionColumnNames.nonEmpty) {
val partitionAttributes = table.partitionColumnNames.map { name =>
query.resolve(name :: Nil, SparkSession.getActiveSession.get.sessionState.analyzer.resolver)
.getOrElse {
throw new AnalysisException(
s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
Seq(partitionAttributes.map(SortOrder(_, Ascending)))
} else {
Seq.fill(children.size)(Nil)
}
}

/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
Expand Down

0 comments on commit 1008b2e

Please sign in to comment.