Skip to content

Commit

Permalink
Add support for external sort.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Nov 14, 2014
1 parent 4b4b50c commit afd7562
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ private[spark] object SQLConf {

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"

// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
val EXTERNAL_SORT = "spark.sql.planner.externalSort"

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

Expand Down Expand Up @@ -96,6 +100,9 @@ private[sql] trait SQLConf {
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean

/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil

case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution

import org.apache.spark.util.collection.ExternalSorter

import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe.TypeTag

Expand Down Expand Up @@ -189,6 +191,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

/**
* :: DeveloperApi ::
* Performs a sort on-heap.
*/
@DeveloperApi
case class Sort(
Expand All @@ -199,12 +202,35 @@ case class Sort(
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
}, preservesPartitioning = true)
}

override def output = child.output
}

/**
* :: DeveloperApi ::
* Performs a sort, spilling to disk as needed.
*/
@DeveloperApi
case class ExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {
child.execute()
.mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r, null)))
sorter.iterator.map(_._1)
}, preservesPartitioning = true)
}

Expand Down

0 comments on commit afd7562

Please sign in to comment.