Skip to content

Commit

Permalink
[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst…
Browse files Browse the repository at this point in the history
… logical plans & sample usage.

The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`.

This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold).

Finally, there are a couple minor accompanying changes including:
- Remove the not-in-use `BaseRelation`.
- Make SparkLogicalPlan take a `SQLContext` in the second param list.

Author: Zongheng Yang <[email protected]>

Closes apache#1238 from concretevitamin/estimates and squashes the following commits:

329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf.
8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception.
2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan.
9951305 [Zongheng Yang] Remove childrenStats.
16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled.
8bd2816 [Zongheng Yang] Add a note on performance of statistics.
6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation.
01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section.
549061c [Zongheng Yang] Remove numTuples in Statistics for now.
729a8e2 [Zongheng Yang] Update docs to be more explicit.
573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait.
2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite.
ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests).
0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates.
4ef0d26 [Zongheng Yang] Make Statistics a case class.
3ba8f3e [Zongheng Yang] Add comment.
e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain.
7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin.
73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan.
73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes.
7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples.
de3ae13 [Zongheng Yang] Add parquetAfter() properly in test.
dcff9bd [Zongheng Yang] Cleanups.
84301a4 [Zongheng Yang] Refactors.
5bf5586 [Zongheng Yang] Typo.
56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.
  • Loading branch information
concretevitamin authored and marmbrus committed Jul 29, 2014
1 parent dc96536 commit c7db274
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
Expand All @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.trees
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overriden version of `Statistics`.
*
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
case class Statistics(
sizeInBytes: BigInt
)
lazy val statistics: Statistics = Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
)

/**
* Returns the set of attributes that are referenced by this node
* during evaluation.
Expand Down Expand Up @@ -92,6 +111,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
self: Product =>

override lazy val statistics: Statistics =
throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics")

// Leaf nodes by definition cannot reference any input attributes.
override def references = Set.empty
}
Expand Down
61 changes: 35 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@ import java.util.Properties

import scala.collection.JavaConverters._

object SQLConf {
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}

/**
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
* getters of this class.
* A trait that enables the setting and getting of mutable config parameters/hints.
*
* In the presence of a SQLContext, these can be set and queried by passing SET commands
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
* modify the hints by programmatically calling the setters and getters of this trait.
*
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._

@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

Expand All @@ -40,28 +54,33 @@ trait SQLConf {

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* a broadcast value during the physical executions of join operations. Setting this to -1
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
*/
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
private[spark] def autoBroadcastJoinThreshold: Int =
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt

/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
*/
private[spark] def defaultSizeInBytes: Long =
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)

/** ********************** SQLConf functionality methods ************ */

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

def set(props: Properties): Unit = {
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
settings.synchronized {
props.asScala.foreach { case (k, v) => settings.put(k, v) }
}
}

def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for $key")
require(value != null, s"value cannot be null for key: $key")
settings.put(key, value)
}

Expand Down Expand Up @@ -90,13 +109,3 @@ trait SQLConf {
}

}

object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}
20 changes: 8 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
Expand Down Expand Up @@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))

/**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
Expand Down Expand Up @@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))

/**
* :: Experimental ::
Expand Down Expand Up @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
val name = tableName
val newPlan = rdd.logicalPlan transform {
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
}
catalog.registerTable(None, tableName, newPlan)
catalog.registerTable(None, tableName, rdd.logicalPlan)
}

/**
Expand Down Expand Up @@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
case inMem: InMemoryRelation =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
Expand Down Expand Up @@ -405,7 +401,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
}
}
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
}

}
3 changes: 2 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ class SchemaRDD(
* @group schema
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
new SchemaRDD(sqlContext,
SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
}

// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
baseLogicalPlan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
}
}
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
}

/**
Expand Down Expand Up @@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* @group userf
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Logging, Row}
import org.apache.spark.sql.{Logging, Row, SQLContext}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._

/**
Expand Down Expand Up @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* linking.
*/
@DeveloperApi
case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
extends BaseRelation with MultiInstanceRelation {
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
extends LogicalPlan with MultiInstanceRelation {

def output = alreadyPlanned.output
override def references = Set.empty
Expand All @@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case _ => sys.error("Multiple instance of the same relation detected.")
}, tableName)
.asInstanceOf[this.type]
})(sqlContext).asInstanceOf[this.type]
}

@transient override lazy val statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
)

}

private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
Expand Down
Loading

0 comments on commit c7db274

Please sign in to comment.