Skip to content

Commit

Permalink
Centralize logic for picking sort operator implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Jul 6, 2015
1 parent 9869ec2 commit 6d6a1e6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
}

val withSort = if (needSort) {
if (sqlContext.conf.unsafeEnabled
&& UnsafeRowConverter.supportsSchema(withShuffle.schema)) {
UnsafeExternalSort(rowOrdering, global = false, withShuffle)
} else if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, withShuffle)
} else {
Sort(rowOrdering, global = false, withShuffle)
}
sqlContext.planner.BasicOperators.getSortOperator(
rowOrdering, global = false, withShuffle)
} else {
withShuffle
}
Expand Down Expand Up @@ -337,11 +331,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
case (UnspecifiedDistribution, Seq(), child) =>
child
case (UnspecifiedDistribution, rowOrdering, child) =>
if (sqlContext.conf.externalSortEnabled) {
ExternalSort(rowOrdering, global = false, child)
} else {
Sort(rowOrdering, global = false, child)
}
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)

case (dist, ordering, _) =>
sys.error(s"Don't know how to ensure $dist with ordering $ordering")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object BasicOperators extends Strategy {
def numPartitions: Int = self.numPartitions

/**
* Picks an appropriate sort operator.
*
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && UnsafeRowConverter.supportsSchema(child.schema)) {
execution.UnsafeExternalSort(sortExprs, global, child)
} else if (sqlContext.conf.externalSortEnabled) {
execution.ExternalSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
}
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil

Expand All @@ -302,11 +318,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
getSortOperator(sortExprs, global = false, planLater(child)) :: Nil
case logical.Sort(sortExprs, global, child) =>
execution.Sort(sortExprs, global, planLater(child)):: Nil
getSortOperator(sortExprs, global, planLater(child)):: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
Expand Down

0 comments on commit 6d6a1e6

Please sign in to comment.