Skip to content

Commit

Permalink
Move SQLConf to Catalyst & add default val for sizeInBytes.
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala
  • Loading branch information
concretevitamin committed Jul 29, 2014
1 parent 7a60ab7 commit 73412be
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,34 @@
* limitations under the License.
*/

package org.apache.spark.sql
package org.apache.spark.sql.catalyst.planning

import java.util.Properties

import scala.collection.JavaConverters._

private object SQLConf {
@transient protected[spark] val confSettings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
}

/**
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
* getters of this class.
* A trait that enables the setting and getting of mutable config parameters/hints. The central
* location for storing them is uniquely located in the same-name private companion object.
* Therefore, all classes that mix in this trait share all the hints.
*
* In the presence of a SQLContext, these can be set and queried either by passing SET commands
* into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can
* modify the hints by programmatically calling the setters and getters of this trait.
*
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._

import SQLConf._
protected[spark] val settings = confSettings

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

Expand All @@ -40,50 +51,58 @@ trait SQLConf {

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to 0
* a broadcast value during the physical executions of join operations. Setting this to -1
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
*/
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt

/** ********************** SQLConf functionality methods ************ */
/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
*/
private[spark] def statsDefaultSizeInBytes: Long =
getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong)
.getOrElse(autoConvertJoinSize + 1)

@transient
private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
/** ********************** SQLConf functionality methods ************ */

def set(props: Properties): Unit = {
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
confSettings.synchronized {
props.asScala.foreach { case (k, v) => confSettings.put(k, v) }
}
}

def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for $key")
settings.put(key, value)
require(value != null, s"value cannot be null for key: $key")
confSettings.put(key, value)
}

def get(key: String): String = {
Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
Option(confSettings.get(key)).getOrElse(throw new NoSuchElementException(key))
}

def get(key: String, defaultValue: String): String = {
Option(settings.get(key)).getOrElse(defaultValue)
Option(confSettings.get(key)).getOrElse(defaultValue)
}

def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray }
def getAll: Array[(String, String)] = confSettings.synchronized { confSettings.asScala.toArray }

def getOption(key: String): Option[String] = Option(settings.get(key))
def getOption(key: String): Option[String] = Option(confSettings.get(key))

def contains(key: String): Boolean = settings.containsKey(key)
def contains(key: String): Boolean = confSettings.containsKey(key)

def toDebugString: String = {
settings.synchronized {
settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
confSettings.synchronized {
confSettings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
}
}

private[spark] def clear() {
settings.clear()
confSettings.clear()
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.SQLConf
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.trees

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf {
self: Product =>

// TODO: make a case class?
protected class Statistics {
lazy val childrenStats = children.map(_.statistics)

lazy val numTuples: Long = childrenStats.map(_.numTuples).sum
lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum

lazy val sizeInBytes: Long = {
val sum = childrenStats.map(_.sizeInBytes).sum
if (sum == 0) statsDefaultSizeInBytes else sum
}
}

/**
Expand Down
8 changes: 5 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

package org.apache.spark.sql


import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.planning.SQLConf
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.SparkStrategies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

// TODO: use optimization here as well
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.ShuffledHashJoin(
Expand Down

0 comments on commit 73412be

Please sign in to comment.