From 4b19d6ccd3544e02811361cf7cd06327a15335e5 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 5 Aug 2014 18:15:08 -0700 Subject: [PATCH] Tighten the visibility of various SQLConf methods and renamed setter/getters. --- .../scala/org/apache/spark/sql/SQLConf.scala | 44 +++++++++---------- .../apache/spark/sql/execution/commands.scala | 8 ++-- .../org/apache/spark/sql/SQLConfSuite.scala | 33 +++++++------- .../apache/spark/sql/hive/HiveContext.scala | 12 ++--- .../org/apache/spark/sql/hive/TestHive.scala | 4 +- .../sql/hive/execution/HiveQuerySuite.scala | 14 +++--- 6 files changed, 54 insertions(+), 61 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 40bfd55e95a12..b916385414b3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -21,13 +21,11 @@ import java.util.Properties import scala.collection.JavaConverters._ -object SQLConf { +private[spark] object SQLConf { val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed" val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold" val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes" - val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" @@ -66,13 +64,13 @@ trait SQLConf { * Note that the choice of dialect does not affect things like what tables are available or * how query execution is performed. */ - private[spark] def dialect: String = get(DIALECT, "sql") + private[spark] def dialect: String = getConf(DIALECT, "sql") /** When true tables cached using the in-memory columnar caching will be compressed. */ - private[spark] def useCompression: Boolean = get(COMPRESS_CACHED, "false").toBoolean + private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean /** Number of partitions to use for shuffle operators. */ - private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt + private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode @@ -84,7 +82,7 @@ trait SQLConf { * Defaults to false as this feature is currently experimental. */ private[spark] def codegenEnabled: Boolean = - if (get(CODEGEN_ENABLED, "false") == "true") true else false + if (getConf(CODEGEN_ENABLED, "false") == "true") true else false /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to @@ -94,7 +92,7 @@ trait SQLConf { * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. */ private[spark] def autoBroadcastJoinThreshold: Int = - get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt + getConf(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt /** * The default size in bytes to assign to a logical operator's estimation statistics. By default, @@ -102,41 +100,39 @@ trait SQLConf { * properly implemented estimation of this statistic will not be incorrectly broadcasted in joins. */ private[spark] def defaultSizeInBytes: Long = - getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1) + getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong /** ********************** SQLConf functionality methods ************ */ - def set(props: Properties): Unit = { + /** Set Spark SQL configuration properties. */ + def setConf(props: Properties): Unit = { settings.synchronized { props.asScala.foreach { case (k, v) => settings.put(k, v) } } } - def set(key: String, value: String): Unit = { + /** Set the given Spark SQL configuration property. */ + def setConf(key: String, value: String): Unit = { require(key != null, "key cannot be null") require(value != null, s"value cannot be null for key: $key") settings.put(key, value) } - def get(key: String): String = { + /** Return the value of Spark SQL configuration property for the given key. */ + def getConf(key: String): String = { Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key)) } - def get(key: String, defaultValue: String): String = { + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue`. + */ + def getConf(key: String, defaultValue: String): String = { Option(settings.get(key)).getOrElse(defaultValue) } - def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray } - - def getOption(key: String): Option[String] = Option(settings.get(key)) - - def contains(key: String): Boolean = settings.containsKey(key) - - def toDebugString: String = { - settings.synchronized { - settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n") - } - } + /** Return all the configuration properties that have been set (i.e. not the default). */ + def getAllConfs: Array[(String, String)] = settings.synchronized { settings.asScala.toArray } private[spark] def clear() { settings.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 9293239131d52..d30697ec2fbf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -53,10 +53,10 @@ case class SetCommand( if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) { logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") - context.set(SQLConf.SHUFFLE_PARTITIONS, v) + context.setConf(SQLConf.SHUFFLE_PARTITIONS, v) Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v") } else { - context.set(k, v) + context.setConf(k, v) Array(s"$k=$v") } @@ -77,12 +77,12 @@ case class SetCommand( "system:sun.java.command=shark.SharkServer2") } else { - Array(s"$k=${context.getOption(k).getOrElse("")}") + Array(s"$k=${context.getConf(k, "")}") } // Query all key-value pairs that are set in the SQLConf of the context. case (None, None) => - context.getAll.map { case (k, v) => + context.getAllConfs.map { case (k, v) => s"$k=$v" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 1a58d73d9e7f4..d29442849bfa9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -29,21 +29,18 @@ class SQLConfSuite extends QueryTest { test("programmatic ways of basic setting and getting") { clear() - assert(getOption(testKey).isEmpty) - assert(getAll.toSet === Set()) + assert(getAllConfs.toSet === Set()) - set(testKey, testVal) - assert(get(testKey) == testVal) - assert(get(testKey, testVal + "_") == testVal) - assert(getOption(testKey) == Some(testVal)) - assert(contains(testKey)) + setConf(testKey, testVal) + assert(getConf(testKey) == testVal) + assert(getConf(testKey, testVal + "_") == testVal) + assert(getAllConfs.contains(testKey)) // Tests SQLConf as accessed from a SQLContext is mutable after // the latter is initialized, unlike SparkConf inside a SparkContext. - assert(TestSQLContext.get(testKey) == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.getOption(testKey) == Some(testVal)) - assert(TestSQLContext.contains(testKey)) + assert(TestSQLContext.getConf(testKey) == testVal) + assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal) + assert(TestSQLContext.getAllConfs.contains(testKey)) clear() } @@ -51,21 +48,21 @@ class SQLConfSuite extends QueryTest { test("parse SQL set commands") { clear() sql(s"set $testKey=$testVal") - assert(get(testKey, testVal + "_") == testVal) - assert(TestSQLContext.get(testKey, testVal + "_") == testVal) + assert(getConf(testKey, testVal + "_") == testVal) + assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal) sql("set some.property=20") - assert(get("some.property", "0") == "20") + assert(getConf("some.property", "0") == "20") sql("set some.property = 40") - assert(get("some.property", "0") == "40") + assert(getConf("some.property", "0") == "40") val key = "spark.sql.key" val vs = "val0,val_1,val2.3,my_table" sql(s"set $key=$vs") - assert(get(key, "0") == vs) + assert(getConf(key, "0") == vs) sql(s"set $key=") - assert(get(key, "0") == "") + assert(getConf(key, "0") == "") clear() } @@ -73,6 +70,6 @@ class SQLConfSuite extends QueryTest { test("deprecated property") { clear() sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") - assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10") + assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d8e7a5943daa5..53f3dc11dbb9f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -60,9 +60,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Sets up the system initially or after a RESET command */ protected def configure() { - set("javax.jdo.option.ConnectionURL", + setConf("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastorePath;create=true") - set("hive.metastore.warehouse.dir", warehousePath) + setConf("hive.metastore.warehouse.dir", warehousePath) } configure() // Must be called before initializing the catalog below. @@ -76,7 +76,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => // Change the default SQL dialect to HiveQL - override private[spark] def dialect: String = get(SQLConf.DIALECT, "hiveql") + override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql") override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } @@ -224,15 +224,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) @transient protected[hive] lazy val sessionState = { val ss = new SessionState(hiveconf) - set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. + setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. ss } sessionState.err = new PrintStream(outputBuffer, true, "UTF-8") sessionState.out = new PrintStream(outputBuffer, true, "UTF-8") - override def set(key: String, value: String): Unit = { - super.set(key, value) + override def setConf(key: String, value: String): Unit = { + super.setConf(key, value) runSqlHive(s"SET $key=$value") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c605e8adcfb0f..d890df866fbe5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -65,9 +65,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Sets up the system initially or after a RESET command */ protected def configure() { - set("javax.jdo.option.ConnectionURL", + setConf("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastorePath;create=true") - set("hive.metastore.warehouse.dir", warehousePath) + setConf("hive.metastore.warehouse.dir", warehousePath) } configure() // Must be called before initializing the catalog below. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2f0be49b6a6d7..fdb2f41f5a5b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -75,9 +75,9 @@ class HiveQuerySuite extends HiveComparisonTest { "SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1") test("Query expressed in SQL") { - set("spark.sql.dialect", "sql") + setConf("spark.sql.dialect", "sql") assert(sql("SELECT 1").collect() === Array(Seq(1))) - set("spark.sql.dialect", "hiveql") + setConf("spark.sql.dialect", "hiveql") } @@ -436,18 +436,18 @@ class HiveQuerySuite extends HiveComparisonTest { val testVal = "val0,val_1,val2.3,my_table" sql(s"set $testKey=$testVal") - assert(get(testKey, testVal + "_") == testVal) + assert(getConf(testKey, testVal + "_") == testVal) sql("set some.property=20") - assert(get("some.property", "0") == "20") + assert(getConf("some.property", "0") == "20") sql("set some.property = 40") - assert(get("some.property", "0") == "40") + assert(getConf("some.property", "0") == "40") sql(s"set $testKey=$testVal") - assert(get(testKey, "0") == testVal) + assert(getConf(testKey, "0") == testVal) sql(s"set $testKey=") - assert(get(testKey, "0") == "") + assert(getConf(testKey, "0") == "") } test("SET commands semantics for a HiveContext") {