diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala similarity index 53% rename from sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala index df76195723071..c78c2439c2533 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala @@ -15,23 +15,34 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.catalyst.planning import java.util.Properties import scala.collection.JavaConverters._ +private object SQLConf { + @transient protected[spark] val confSettings = java.util.Collections.synchronizedMap( + new java.util.HashMap[String, String]()) +} + /** - * SQLConf holds mutable config parameters and hints. These can be set and - * queried either by passing SET commands into Spark SQL's DSL - * functions (sql(), hql(), etc.), or by programmatically using setters and - * getters of this class. + * A trait that enables the setting and getting of mutable config parameters/hints. The central + * location for storing them is uniquely located in the same-name private companion object. + * Therefore, all classes that mix in this trait share all the hints. + * + * In the presence of a SQLContext, these can be set and queried either by passing SET commands + * into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can + * modify the hints by programmatically calling the setters and getters of this trait. * * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads). */ trait SQLConf { import SQLConf._ + import SQLConf._ + protected[spark] val settings = confSettings + /** ************************ Spark SQL Params/Hints ******************* */ // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? @@ -40,50 +51,58 @@ trait SQLConf { /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to - * a broadcast value during the physical executions of join operations. Setting this to 0 + * a broadcast value during the physical executions of join operations. Setting this to -1 * effectively disables auto conversion. - * Hive setting: hive.auto.convert.join.noconditionaltask.size. + * + * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt - /** ********************** SQLConf functionality methods ************ */ + /** + * The default size in bytes to assign to a logical operator's estimation statistics. By default, + * it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a + * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. + */ + private[spark] def statsDefaultSizeInBytes: Long = + getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong) + .getOrElse(autoConvertJoinSize + 1) - @transient - private val settings = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, String]()) + /** ********************** SQLConf functionality methods ************ */ def set(props: Properties): Unit = { - props.asScala.foreach { case (k, v) => this.settings.put(k, v) } + confSettings.synchronized { + props.asScala.foreach { case (k, v) => confSettings.put(k, v) } + } } def set(key: String, value: String): Unit = { require(key != null, "key cannot be null") - require(value != null, s"value cannot be null for $key") - settings.put(key, value) + require(value != null, s"value cannot be null for key: $key") + confSettings.put(key, value) } def get(key: String): String = { - Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) + Option(confSettings.get(key)).getOrElse(throw new NoSuchElementException(key)) } def get(key: String, defaultValue: String): String = { - Option(settings.get(key)).getOrElse(defaultValue) + Option(confSettings.get(key)).getOrElse(defaultValue) } - def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray } + def getAll: Array[(String, String)] = confSettings.synchronized { confSettings.asScala.toArray } - def getOption(key: String): Option[String] = Option(settings.get(key)) + def getOption(key: String): Option[String] = Option(confSettings.get(key)) - def contains(key: String): Boolean = settings.containsKey(key) + def contains(key: String): Boolean = confSettings.containsKey(key) def toDebugString: String = { - settings.synchronized { - settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") + confSettings.synchronized { + confSettings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") } } private[spark] def clear() { - settings.clear() + confSettings.clear() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 3ac9b8d8e42ef..45e8830b9ba76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -19,17 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees -abstract class LogicalPlan extends QueryPlan[LogicalPlan] { +abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf { self: Product => + // TODO: make a case class? protected class Statistics { lazy val childrenStats = children.map(_.statistics) + lazy val numTuples: Long = childrenStats.map(_.numTuples).sum - lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum + + lazy val sizeInBytes: Long = { + val sum = childrenStats.map(_.sizeInBytes).sum + if (sum == 0) statsDefaultSizeInBytes else sum + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 568a64951def3..d3718c55543fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql + import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -24,14 +25,15 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl.ExpressionConversions -import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.planning.SQLConf import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.SparkStrategies diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bafc5aaff3f03..dffa112fb7ee2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -82,6 +82,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) + // TODO: use optimization here as well case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => val hashJoin = execution.ShuffledHashJoin(