-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25132][SQL] Case-insensitive field resolution when reading from Parquet #22148
Changes from 3 commits
1600190
ce4c935
9261beb
c8279d2
0176d29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,9 @@ import org.apache.parquet.schema._ | |
import org.apache.parquet.schema.Type.Repetition | ||
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types._ | ||
|
||
/** | ||
|
@@ -71,8 +73,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) | |
StructType.fromString(schemaString) | ||
} | ||
|
||
val parquetRequestedSchema = | ||
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) | ||
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, | ||
SQLConf.CASE_SENSITIVE.defaultValue.get) | ||
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( | ||
context.getFileSchema, catalystRequestedSchema, caseSensitive) | ||
|
||
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) | ||
} | ||
|
@@ -117,8 +121,12 @@ private[parquet] object ParquetReadSupport { | |
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist | ||
* in `catalystSchema`, and adding those only exist in `catalystSchema`. | ||
*/ | ||
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { | ||
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) | ||
def clipParquetSchema( | ||
parquetSchema: MessageType, | ||
catalystSchema: StructType, | ||
caseSensitive: Boolean = true): MessageType = { | ||
val clippedParquetFields = clipParquetGroupFields( | ||
parquetSchema.asGroupType(), catalystSchema, caseSensitive) | ||
if (clippedParquetFields.isEmpty) { | ||
ParquetSchemaConverter.EMPTY_MESSAGE | ||
} else { | ||
|
@@ -129,20 +137,21 @@ private[parquet] object ParquetReadSupport { | |
} | ||
} | ||
|
||
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { | ||
private def clipParquetType( | ||
parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = { | ||
catalystType match { | ||
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => | ||
// Only clips array types with nested type as element type. | ||
clipParquetListType(parquetType.asGroupType(), t.elementType) | ||
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive) | ||
|
||
case t: MapType | ||
if !isPrimitiveCatalystType(t.keyType) || | ||
!isPrimitiveCatalystType(t.valueType) => | ||
// Only clips map types with nested key type or value type | ||
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) | ||
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive) | ||
|
||
case t: StructType => | ||
clipParquetGroup(parquetType.asGroupType(), t) | ||
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive) | ||
|
||
case _ => | ||
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able | ||
|
@@ -168,14 +177,15 @@ private[parquet] object ParquetReadSupport { | |
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a | ||
* [[StructType]]. | ||
*/ | ||
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { | ||
private def clipParquetListType( | ||
parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = { | ||
// Precondition of this method, should only be called for lists with nested element types. | ||
assert(!isPrimitiveCatalystType(elementType)) | ||
|
||
// Unannotated repeated group should be interpreted as required list of required element, so | ||
// list element type is just the group itself. Clip it. | ||
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { | ||
clipParquetType(parquetList, elementType) | ||
clipParquetType(parquetList, elementType, caseSensitive) | ||
} else { | ||
assert( | ||
parquetList.getOriginalType == OriginalType.LIST, | ||
|
@@ -207,7 +217,7 @@ private[parquet] object ParquetReadSupport { | |
Types | ||
.buildGroup(parquetList.getRepetition) | ||
.as(OriginalType.LIST) | ||
.addField(clipParquetType(repeatedGroup, elementType)) | ||
.addField(clipParquetType(repeatedGroup, elementType, caseSensitive)) | ||
.named(parquetList.getName) | ||
} else { | ||
// Otherwise, the repeated field's type is the element type with the repeated field's | ||
|
@@ -218,7 +228,7 @@ private[parquet] object ParquetReadSupport { | |
.addField( | ||
Types | ||
.repeatedGroup() | ||
.addField(clipParquetType(repeatedGroup.getType(0), elementType)) | ||
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive)) | ||
.named(repeatedGroup.getName)) | ||
.named(parquetList.getName) | ||
} | ||
|
@@ -231,7 +241,10 @@ private[parquet] object ParquetReadSupport { | |
* a [[StructType]]. | ||
*/ | ||
private def clipParquetMapType( | ||
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { | ||
parquetMap: GroupType, | ||
keyType: DataType, | ||
valueType: DataType, | ||
caseSensitive: Boolean): GroupType = { | ||
// Precondition of this method, only handles maps with nested key types or value types. | ||
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) | ||
|
||
|
@@ -243,8 +256,8 @@ private[parquet] object ParquetReadSupport { | |
Types | ||
.repeatedGroup() | ||
.as(repeatedGroup.getOriginalType) | ||
.addField(clipParquetType(parquetKeyType, keyType)) | ||
.addField(clipParquetType(parquetValueType, valueType)) | ||
.addField(clipParquetType(parquetKeyType, keyType, caseSensitive)) | ||
.addField(clipParquetType(parquetValueType, valueType, caseSensitive)) | ||
.named(repeatedGroup.getName) | ||
|
||
Types | ||
|
@@ -262,8 +275,9 @@ private[parquet] object ParquetReadSupport { | |
* [[MessageType]]. Because it's legal to construct an empty requested schema for column | ||
* pruning. | ||
*/ | ||
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { | ||
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) | ||
private def clipParquetGroup( | ||
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = { | ||
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive) | ||
Types | ||
.buildGroup(parquetRecord.getRepetition) | ||
.as(parquetRecord.getOriginalType) | ||
|
@@ -277,14 +291,35 @@ private[parquet] object ParquetReadSupport { | |
* @return A list of clipped [[GroupType]] fields, which can be empty. | ||
*/ | ||
private def clipParquetGroupFields( | ||
parquetRecord: GroupType, structType: StructType): Seq[Type] = { | ||
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap | ||
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = { | ||
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false) | ||
structType.map { f => | ||
parquetFieldMap | ||
.get(f.name) | ||
.map(clipParquetType(_, f.dataType)) | ||
.getOrElse(toParquet.convertField(f)) | ||
if (caseSensitive) { | ||
val caseSensitiveParquetFieldMap = | ||
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap | ||
structType.map { f => | ||
caseSensitiveParquetFieldMap | ||
.get(f.name) | ||
.map(clipParquetType(_, f.dataType, caseSensitive)) | ||
.getOrElse(toParquet.convertField(f)) | ||
} | ||
} else { | ||
// Do case-insensitive resolution only if in case-insensitive mode | ||
val caseInsensitiveParquetFieldMap = | ||
parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
structType.map { f => | ||
caseInsensitiveParquetFieldMap | ||
.get(f.name.toLowerCase) | ||
.map { parquetTypes => | ||
if (parquetTypes.size > 1) { | ||
// Need to fail if there is ambiguity, i.e. more than one field is matched | ||
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]") | ||
throw new AnalysisException(s"""Found duplicate field(s) "${f.name}": """ + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is triggered at runtime at executor side, we should probably use |
||
s"$parquetTypesString in case-insensitive mode") | ||
} else { | ||
clipParquetType(parquetTypes.head, f.dataType, caseSensitive) | ||
} | ||
}.getOrElse(toParquet.convertField(f)) | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -430,6 +430,48 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo | |
} | ||
} | ||
} | ||
|
||
test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") { | ||
withTempDir { dir => | ||
val format = "parquet" | ||
val tableDir = dir.getCanonicalPath + s"/$format" | ||
val tableName = s"spark_25132_${format}" | ||
withTable(tableName) { | ||
val end = 5 | ||
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B") | ||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { | ||
data.write.format(format).mode("overwrite").save(tableDir) | ||
} | ||
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not related to this PR, but it makes me think that case-sensitivity should be a global or at least table level config, otherwise the behavior is a little confusing. cc @gatorsmile There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. table-level conf is reasonable. Let us do it in 3.0? |
||
|
||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { | ||
checkAnswer(sql(s"select a from $tableName"), data.select("A")) | ||
checkAnswer(sql(s"select A from $tableName"), data.select("A")) | ||
|
||
// AnalysisException from executors is wrapped as SparkException on driver side | ||
val e1 = intercept[SparkException] { | ||
sql(s"select b from $tableName").collect() | ||
} | ||
assert( | ||
e1.getCause.isInstanceOf[AnalysisException] && | ||
e1.getCause.getMessage.contains( | ||
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) | ||
val e2 = intercept[SparkException] { | ||
sql(s"select B from $tableName").collect() | ||
} | ||
assert( | ||
e2.getCause.isInstanceOf[AnalysisException] && | ||
e2.getCause.getMessage.contains( | ||
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) | ||
} | ||
|
||
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { | ||
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null))) | ||
checkAnswer(sql(s"select b from $tableName"), data.select("b")) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
object TestingUDT { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would remove this brace per https://github.com/databricks/scala-style-guide#anonymous-methods