Skip to content

Commit

Permalink
[SPARK-50157][SQL] Using SQLConf provided by SparkSession first.
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Oct 29, 2024
1 parent 55be8e7 commit 1d28fe3
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods)
extends SparkStrategies with SQLConfHelper {

def numPartitions: Int = conf.numShufflePartitions
def numPartitions: Int = session.sessionState.conf.numShufflePartitions

override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
val partitionCols = relation.partitionCols
// For partitioned tables, the partition directory may be outside of the table directory.
// Which is expensive to get table size. Please see how we implemented it in the AnalyzeTable.
val sizeInBytes = if (conf.fallBackToHdfsForStatsEnabled && partitionCols.isEmpty) {
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled &&
partitionCols.isEmpty) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
Expand All @@ -131,10 +132,10 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
} catch {
case e: IOException =>
logWarning("Failed to get table size from HDFS.", e)
conf.defaultSizeInBytes
session.sessionState.conf.defaultSizeInBytes
}
} else {
conf.defaultSizeInBytes
session.sessionState.conf.defaultSizeInBytes
}

val stats = Some(Statistics(sizeInBytes = BigInt(sizeInBytes)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
if (partitionKeyFilters.nonEmpty) {
val newPartitions = ExternalCatalogUtils.listPartitionsByFilter(conf,
val newPartitions = ExternalCatalogUtils.listPartitionsByFilter(session.sessionState.conf,
session.sessionState.catalog, relation.tableMeta, partitionKeyFilters.toSeq)
val newTableMeta = updateTableMeta(relation, newPartitions, partitionKeyFilters)
val newRelation = relation.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
createQueryTest("no from clause",
"SELECT 1, +1, -1")

if (!conf.ansiEnabled) {
if (!spark.sessionState.conf.ansiEnabled) {
createQueryTest("boolean = number",
"""
|SELECT
Expand Down Expand Up @@ -279,7 +279,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
createQueryTest("Constant Folding Optimization for AVG_SUM_COUNT",
"SELECT AVG(0), SUM(0), COUNT(null), COUNT(value) FROM src GROUP BY key")

if (!conf.ansiEnabled) {
if (!spark.sessionState.conf.ansiEnabled) {
createQueryTest("Cast Timestamp to Timestamp in UDF",
"""
| SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp))
Expand Down Expand Up @@ -515,7 +515,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
createQueryTest("Specify the udtf output",
"SELECT d FROM (SELECT explode(array(1,1)) d FROM src LIMIT 1) t")

if (!conf.ansiEnabled) {
if (!spark.sessionState.conf.ansiEnabled) {
createQueryTest("SPARK-9034 Reflect field names defined in GenericUDTF #1",
"SELECT col FROM (SELECT explode(array(key,value)) FROM src LIMIT 1) t")
}
Expand Down Expand Up @@ -779,7 +779,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
test("SPARK-5367: resolve star expression in udf") {
assert(sql("select concat(*) from src limit 5").collect().length == 5)
assert(sql("select concat(key, *) from src limit 5").collect().length == 5)
if (!conf.ansiEnabled) {
if (!spark.sessionState.conf.ansiEnabled) {
assert(sql("select array(*) from src limit 5").collect().length == 5)
assert(sql("select array(key, *) from src limit 5").collect().length == 5)
}
Expand Down

0 comments on commit 1d28fe3

Please sign in to comment.