Skip to content

Commit

Permalink
[SPARK-50157][SQL] Using SQLConf provided by SparkSession first
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR proposes to use `SQLConf` provided by `SparkSession` first.

### Why are the changes needed?
`SQLConf` provided by `SparkSession` have better perf than `SQLConf.get`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA tests.
The benchmark test.
```
object SQLConfBenchmark extends SqlBasedBenchmark {
  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("Get SQLConf") {
      val iters = 1000
      val benchmark = new Benchmark("Benchmark SQLConf", iters, output = output)
      benchmark.addCase("SQLConf.get") { _ =>
        for (_ <- 1 to iters) {
          val conf = SQLConf.get
        }
      }

      benchmark.addCase("sessionState.conf") { _ =>
        for (_ <- 1 to iters) {
          val conf = spark.sessionState.conf
        }
      }

      benchmark.run()
    }
  }
}
```
The benchmark output.
```
Benchmark SQLConf:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
SQLConf.get                                           0              0           1         22.9          43.6       1.0X
sessionState.conf                                     0              0           0       1377.4           0.7      60.1X
```

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#48693 from beliefer/SPARK-50157.

Authored-by: beliefer <[email protected]>
Signed-off-by: beliefer <[email protected]>
  • Loading branch information
beliefer committed Dec 13, 2024
1 parent 98cef08 commit 819bac9
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.command.v2.V2CommandStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf

class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
extends SparkStrategies with SQLConfHelper {

override def conf: SQLConf = session.sessionState.conf

def numPartitions: Int = conf.numShufflePartitions

override def strategies: Seq[Strategy] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
*/
case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleReadRule {

override def conf: SQLConf = session.sessionState.conf

override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_COL, REBALANCE_PARTITIONS_BY_NONE,
REBALANCE_PARTITIONS_BY_COL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import org.apache.spark.sql.internal.SQLConf
case class InsertAdaptiveSparkPlan(
adaptiveExecutionContext: AdaptiveExecutionContext) extends Rule[SparkPlan] {

override def conf: SQLConf = adaptiveExecutionContext.session.sessionState.conf

override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.internal.SQLConf

/**
* A rule to insert dynamic pruning predicates in order to reuse the results of broadcast.
*/
case class PlanAdaptiveDynamicPruningFilters(
rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper {

override def conf: SQLConf = rootPlan.context.session.sessionState.conf

def apply(plan: SparkPlan): SparkPlan = {
if (!conf.dynamicPartitionPruningEnabled) {
return plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ case class DescribeDatabaseCommand(
if (properties.isEmpty) {
""
} else {
conf.redactOptions(properties).toSeq.sortBy(_._1).mkString("(", ", ", ")")
sparkSession.sessionState.conf.redactOptions(properties).toSeq
.sortBy(_._1).mkString("(", ", ", ")")
}
result :+ Row("Properties", propertiesStr)
} else {
Expand Down Expand Up @@ -548,7 +549,7 @@ case class AlterTableAddPartitionCommand(
// Hive metastore may not have enough memory to handle millions of partitions in single RPC.
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
val batchSize = sparkSession.sessionState.conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
parts.iterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ case class AlterTableAddColumnsCommand(

SchemaUtils.checkColumnNameDuplication(
(colsWithProcessedDefaults ++ catalogTable.schema).map(_.name),
conf.caseSensitiveAnalysis)
sparkSession.sessionState.conf.caseSensitiveAnalysis)
if (!conf.allowCollationsInMapKeys) {
colsToAdd.foreach(col => SchemaUtils.checkNoCollationsInMapKeys(col.dataType))
}
Expand Down Expand Up @@ -501,7 +501,7 @@ case class TruncateTableCommand(
partLocations
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = conf.truncateTableIgnorePermissionAcl
val ignorePermissionAcl = spark.sessionState.conf.truncateTableIgnorePermissionAcl
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
Expand Down Expand Up @@ -819,7 +819,8 @@ case class DescribeColumnCommand(

val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
val colStatsMap = catalogTable.stats.map(_.colStats).getOrElse(Map.empty)
val colStats = if (conf.caseSensitiveAnalysis) colStatsMap else CaseInsensitiveMap(colStatsMap)
val colStats = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) colStatsMap
else CaseInsensitiveMap(colStatsMap)
val cs = colStats.get(field.name)

val comment = if (field.metadata.contains("comment")) {
Expand Down Expand Up @@ -975,7 +976,7 @@ case class ShowTablePropertiesCommand(
Seq.empty[Row]
} else {
val catalogTable = catalog.getTableMetadata(table)
val properties = conf.redactOptions(catalogTable.properties)
val properties = sparkSession.sessionState.conf.redactOptions(catalogTable.properties)
propertyKey match {
case Some(p) =>
val propValue = properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ object ViewHelper extends SQLConfHelper with Logging {
val uncache = getRawTempView(name.table).map { r =>
needsToUncache(r, aliasedPlan)
}.getOrElse(false)
val storeAnalyzedPlanForView = conf.storeAnalyzedPlanForView || originalText.isEmpty
val storeAnalyzedPlanForView = session.sessionState.conf.storeAnalyzedPlanForView ||
originalText.isEmpty
if (replace && uncache) {
logDebug(s"Try to uncache ${name.quotedString} before replacing.")
if (!storeAnalyzedPlanForView) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.command.ViewHelper.generateViewProperties
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
Expand All @@ -47,6 +48,9 @@ import org.apache.spark.util.ArrayImplicits._
* Replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
*/
class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {

override def conf: SQLConf = sparkSession.sessionState.conf

object UnresolvedRelationResolution {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, SparkPlan, SubqueryBroadcastExec}
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.internal.SQLConf

/**
* This planner rule aims at rewriting dynamic pruning predicates in order to reuse the
Expand All @@ -36,6 +37,8 @@ import org.apache.spark.sql.execution.joins._
*/
case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[SparkPlan] {

override def conf: SQLConf = sparkSession.sessionState.conf

/**
* Identify the shape in which keys of a given plan are broadcasted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrate
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


/**
Expand Down Expand Up @@ -117,6 +117,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
}

class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {

override def conf: SQLConf = session.sessionState.conf

private def hiveTableWithStats(relation: HiveTableRelation): HiveTableRelation = {
val table = relation.tableMeta
val partitionCols = relation.partitionCols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.internal.SQLConf

/**
* Prune hive table partitions using partition filters on [[HiveTableRelation]]. The pruned
Expand All @@ -43,6 +44,8 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
private[sql] class PruneHiveTablePartitions(session: SparkSession)
extends Rule[LogicalPlan] with CastSupport with PredicateHelper {

override def conf: SQLConf = session.sessionState.conf

/**
* Extract the partition filters from the filters on the table.
*/
Expand Down

0 comments on commit 819bac9

Please sign in to comment.