Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage. #1238

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
56a8e6e
Prototype impl of estimations for Catalyst logical plans.
concretevitamin Jun 24, 2014
5bf5586
Typo.
concretevitamin Jun 26, 2014
84301a4
Refactors.
concretevitamin Jun 27, 2014
dcff9bd
Cleanups.
concretevitamin Jul 3, 2014
de3ae13
Add parquetAfter() properly in test.
concretevitamin Jul 3, 2014
7a60ab7
s/Estimates/Statistics, s/cardinality/numTuples.
concretevitamin Jul 9, 2014
73412be
Move SQLConf to Catalyst & add default val for sizeInBytes.
concretevitamin Jul 29, 2014
73cde01
Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan.
concretevitamin Jul 9, 2014
7d9216a
Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin.
concretevitamin Jul 9, 2014
e5bcf5b
Fix optimization conditions & update scala docs to explain.
concretevitamin Jul 9, 2014
3ba8f3e
Add comment.
concretevitamin Jul 9, 2014
4ef0d26
Make Statistics a case class.
concretevitamin Jul 9, 2014
0ef9e5b
Use multiplication instead of sum for default estimates.
concretevitamin Jul 10, 2014
43d38a6
Revert optimization for BroadcastNestedLoopJoin (this fixes tests).
concretevitamin Jul 10, 2014
ca5b825
Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
concretevitamin Jul 10, 2014
2d99eb5
{Cleanup, use synchronized in, enrich} StatisticsSuite.
concretevitamin Jul 10, 2014
573e644
Remove singleton SQLConf and move back `settings` to the trait.
concretevitamin Jul 14, 2014
729a8e2
Update docs to be more explicit.
concretevitamin Jul 14, 2014
549061c
Remove numTuples in Statistics for now.
concretevitamin Jul 14, 2014
01b7a3e
Update scaladoc for a field and move it to @param section.
concretevitamin Jul 14, 2014
6e594b8
Get size info from metastore for MetastoreRelation.
concretevitamin Jul 16, 2014
8bd2816
Add a note on performance of statistics.
concretevitamin Jul 16, 2014
16fc60a
Avoid calling statistics on plans if auto join conversion is disabled.
concretevitamin Jul 16, 2014
9951305
Remove childrenStats.
concretevitamin Jul 16, 2014
2f2fb89
Fix statistics for SparkLogicalPlan.
concretevitamin Jul 16, 2014
8663e84
Use BigInt for stat; for logical leaves, by default throw an exception.
concretevitamin Jul 22, 2014
329071d
Address review comments; turn config name from string to field in SQL…
concretevitamin Jul 29, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
Expand All @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.trees
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overriden version of `Statistics`.
*
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
case class Statistics(
sizeInBytes: BigInt
)
lazy val statistics: Statistics = Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
)

/**
* Returns the set of attributes that are referenced by this node
* during evaluation.
Expand Down Expand Up @@ -92,6 +111,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
self: Product =>

override lazy val statistics: Statistics =
throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics")

// Leaf nodes by definition cannot reference any input attributes.
override def references = Set.empty
}
Expand Down
61 changes: 35 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@ import java.util.Properties

import scala.collection.JavaConverters._

object SQLConf {
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}

/**
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
* getters of this class.
* A trait that enables the setting and getting of mutable config parameters/hints.
*
* In the presence of a SQLContext, these can be set and queried by passing SET commands
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
* modify the hints by programmatically calling the setters and getters of this trait.
*
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._

@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

Expand All @@ -40,28 +54,33 @@ trait SQLConf {

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* a broadcast value during the physical executions of join operations. Setting this to -1
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
*/
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
private[spark] def autoBroadcastJoinThreshold: Int =
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt

/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
*/
private[spark] def defaultSizeInBytes: Long =
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)

/** ********************** SQLConf functionality methods ************ */

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

def set(props: Properties): Unit = {
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
settings.synchronized {
props.asScala.foreach { case (k, v) => settings.put(k, v) }
}
}

def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for $key")
require(value != null, s"value cannot be null for key: $key")
settings.put(key, value)
}

Expand Down Expand Up @@ -90,13 +109,3 @@ trait SQLConf {
}

}

object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}
20 changes: 8 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
Expand Down Expand Up @@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))

/**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
Expand Down Expand Up @@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))

/**
* :: Experimental ::
Expand Down Expand Up @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
val name = tableName
val newPlan = rdd.logicalPlan transform {
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
}
catalog.registerTable(None, tableName, newPlan)
catalog.registerTable(None, tableName, rdd.logicalPlan)
}

/**
Expand Down Expand Up @@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
case inMem: InMemoryRelation =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
Expand Down Expand Up @@ -371,7 +367,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
}
}
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
}

}
3 changes: 2 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ class SchemaRDD(
* @group schema
*/
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
new SchemaRDD(sqlContext,
SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
}

// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
baseLogicalPlan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
}
}
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
}

/**
Expand Down Expand Up @@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* @group userf
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Logging, Row}
import org.apache.spark.sql.{Logging, Row, SQLContext}
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._

/**
Expand Down Expand Up @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* linking.
*/
@DeveloperApi
case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
extends BaseRelation with MultiInstanceRelation {
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I'm considering adding a way for all physical plans to automatically have a handle to the sqlContext. This will probably happen in the code gen patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be pretty awesome & reduce much boilerplate code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends LogicalPlan with MultiInstanceRelation {

def output = alreadyPlanned.output
override def references = Set.empty
Expand All @@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case _ => sys.error("Multiple instance of the same relation detected.")
}, tableName)
.asInstanceOf[this.type]
})(sqlContext).asInstanceOf[this.type]
}

@transient override lazy val statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
)

}

private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {
Expand Down
Loading