From 7d86c578848cc4597924a14956ef5bae0b81fef2 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 23 Aug 2016 10:15:10 +0200 Subject: [PATCH] [SPARK-17199] Use CatalystConf.resolver for case-sensitivity comparison --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 +------- .../spark/sql/execution/datasources/DataSource.scala | 10 ++-------- .../sql/execution/datasources/DataSourceStrategy.scala | 8 +------- .../spark/sql/execution/streaming/FileStreamSink.scala | 6 +----- 4 files changed, 5 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 41e0e6d65e9ad..e559f235c5a38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -64,13 +64,7 @@ class Analyzer( this(catalog, conf, conf.optimizerMaxIterations) } - def resolver: Resolver = { - if (conf.caseSensitiveAnalysis) { - caseSensitiveResolution - } else { - caseInsensitiveResolution - } - } + def resolver: Resolver = conf.resolver protected val fixedPoint = FixedPoint(maxIterations) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5ad6ae0956e1c..b783d699745b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -394,13 +394,7 @@ case class DataSource( sparkSession, globbedPaths, options, partitionSchema, !checkPathExist) val dataSchema = userSpecifiedSchema.map { schema => - val equality = - if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } - + val equality = sparkSession.sessionState.conf.resolver StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) }.orElse { format.inferSchema( @@ -430,7 +424,7 @@ case class DataSource( relation } - /** Writes the give [[DataFrame]] out to this [[DataSource]]. */ + /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ def write( mode: SaveMode, data: DataFrame): BaseRelation = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5eba7df060c4e..a6621054fc74b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -45,13 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String */ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { - def resolver: Resolver = { - if (conf.caseSensitiveAnalysis) { - caseSensitiveResolution - } else { - caseInsensitiveResolution - } - } + def resolver: Resolver = conf.resolver // Visible for testing. def convertStaticPartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 117d6672ee2f7..0f7d958136835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -102,11 +102,7 @@ class FileStreamSinkWriter( // Get the actual partition columns as attributes after matching them by name with // the given columns names. private val partitionColumns = partitionColumnNames.map { col => - val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } + val nameEquality = data.sparkSession.sessionState.conf.resolver data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { throw new RuntimeException(s"Partition column $col not found in schema $dataSchema") }