Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17199] Use CatalystConf.resolver for case-sensitivity comparison #14771

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,7 @@ class Analyzer(
this(catalog, conf, conf.optimizerMaxIterations)
}

def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}
def resolver: Resolver = conf.resolver

protected val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,7 @@ case class DataSource(
sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)

val dataSchema = userSpecifiedSchema.map { schema =>
val equality =
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}

val equality = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
}.orElse {
format.inferSchema(
Expand Down Expand Up @@ -430,7 +424,7 @@ case class DataSource(
relation
}

/** Writes the give [[DataFrame]] out to this [[DataSource]]. */
/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
def write(
mode: SaveMode,
data: DataFrame): BaseRelation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
*/
case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
}
def resolver: Resolver = conf.resolver

// Visible for testing.
def convertStaticPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ class FileStreamSinkWriter(
// Get the actual partition columns as attributes after matching them by name with
// the given columns names.
private val partitionColumns = partitionColumnNames.map { col =>
val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}
val nameEquality = data.sparkSession.sessionState.conf.resolver
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $dataSchema")
}
Expand Down