From bf0e7dce3879174d35c48ca2e30c4f55bd827f8a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 15 May 2014 12:08:44 +0800 Subject: [PATCH] Added SortedOperation pattern to match *some* definitely sorted operations and avoid some sorting cost in HiveComparisonTest. --- .../spark/sql/catalyst/planning/patterns.scala | 13 +++++++++++++ .../apache/spark/sql/execution/Aggregate.scala | 2 +- .../sql/hive/execution/HiveComparisonTest.scala | 17 ++++++----------- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 4544b32958c7e..6a1d83ad2f1af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -168,3 +168,16 @@ object Unions { case other => other :: Nil } } + +/** + * A pattern that matches (some) sorted operations and returns corresponding sorting orders. + * Currently operations matched by this pattern are guaranteed to be sorted, but not all sorted + * operations are matched by this pattern. + */ +object SortedOperation { + // TODO (lian) detect more sorted operations + def unapply(plan: LogicalPlan): Option[Seq[SortOrder]] = plan match { + case FilteredOperation(_, Sort(order, _)) => Some(order) + case _ => None + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index 36b3b956da96c..604914e547790 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -116,7 +116,7 @@ case class Aggregate( */ @transient private[this] lazy val resultMap = - (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap + (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap /** * Substituted version of aggregateExpressions expressions which are used to compute final diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index edff38b901073..43939d6cf0c3b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution import java.io._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} + import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.planning.SortedOperation import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.Sort -import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen} import org.apache.spark.sql.hive.test.TestHive /** @@ -131,14 +132,8 @@ abstract class HiveComparisonTest val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") - case _: ExplainCommand => answer - case _ => - // TODO: Really we only care about the final total ordering here... - val isOrdered = hiveQuery.executedPlan.collect { - case s @ Sort(_, global, _) if global => s - }.nonEmpty - // If the query results aren't sorted, then sort them to ensure deterministic answers. - if (!isOrdered) answer.sorted else answer + case _: ExplainCommand | SortedOperation(_) => answer + case _ => answer.sorted } orderedAnswer.map(cleanPaths) } @@ -161,7 +156,7 @@ abstract class HiveComparisonTest "minFileSize" ) protected def nonDeterministicLine(line: String) = - nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_) + nonDeterministicLineIndicators.exists(line contains _) /** * Removes non-deterministic paths from `str` so cached answers will compare correctly.