Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25132][SQL] Case-insensitive field resolution when reading from Parquet #22148

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ class ParquetFileFormat
hadoopConf.set(
SQLConf.SESSION_LOCAL_TIMEZONE.key,
sparkSession.sessionState.conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
SQLConf.CASE_SENSITIVE.key,
sparkSession.sessionState.conf.caseSensitiveAnalysis)

ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.util.{Map => JMap, TimeZone}
import java.util.{Locale, Map => JMap, TimeZone}

import scala.collection.JavaConverters._

Expand All @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -71,8 +72,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
StructType.fromString(schemaString)
}

val parquetRequestedSchema =
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
SQLConf.CASE_SENSITIVE.defaultValue.get)
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema, catalystRequestedSchema, caseSensitive)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
Expand Down Expand Up @@ -117,8 +120,12 @@ private[parquet] object ParquetReadSupport {
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
def clipParquetSchema(
parquetSchema: MessageType,
catalystSchema: StructType,
caseSensitive: Boolean = true): MessageType = {
val clippedParquetFields = clipParquetGroupFields(
parquetSchema.asGroupType(), catalystSchema, caseSensitive)
if (clippedParquetFields.isEmpty) {
ParquetSchemaConverter.EMPTY_MESSAGE
} else {
Expand All @@ -129,20 +136,21 @@ private[parquet] object ParquetReadSupport {
}
}

private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
private def clipParquetType(
parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive)

case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
Expand All @@ -168,14 +176,15 @@ private[parquet] object ParquetReadSupport {
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
* [[StructType]].
*/
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
private def clipParquetListType(
parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = {
// Precondition of this method, should only be called for lists with nested element types.
assert(!isPrimitiveCatalystType(elementType))

// Unannotated repeated group should be interpreted as required list of required element, so
// list element type is just the group itself. Clip it.
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
clipParquetType(parquetList, elementType)
clipParquetType(parquetList, elementType, caseSensitive)
} else {
assert(
parquetList.getOriginalType == OriginalType.LIST,
Expand Down Expand Up @@ -207,7 +216,7 @@ private[parquet] object ParquetReadSupport {
Types
.buildGroup(parquetList.getRepetition)
.as(OriginalType.LIST)
.addField(clipParquetType(repeatedGroup, elementType))
.addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
.named(parquetList.getName)
} else {
// Otherwise, the repeated field's type is the element type with the repeated field's
Expand All @@ -218,7 +227,7 @@ private[parquet] object ParquetReadSupport {
.addField(
Types
.repeatedGroup()
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive))
.named(repeatedGroup.getName))
.named(parquetList.getName)
}
Expand All @@ -231,7 +240,10 @@ private[parquet] object ParquetReadSupport {
* a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
parquetMap: GroupType,
keyType: DataType,
valueType: DataType,
caseSensitive: Boolean): GroupType = {
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))

Expand All @@ -243,8 +255,8 @@ private[parquet] object ParquetReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
.addField(clipParquetType(parquetValueType, valueType, caseSensitive))
.named(repeatedGroup.getName)

Types
Expand All @@ -262,8 +274,9 @@ private[parquet] object ParquetReadSupport {
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
* pruning.
*/
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
private def clipParquetGroup(
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive)
Types
.buildGroup(parquetRecord.getRepetition)
.as(parquetRecord.getOriginalType)
Expand All @@ -277,14 +290,35 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType))
.getOrElse(toParquet.convertField(f))
if (caseSensitive) {
val caseSensitiveParquetFieldMap =
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
structType.map { f =>
caseSensitiveParquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType, caseSensitive))
.getOrElse(toParquet.convertField(f))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
// Do case-insensitive resolution only if in case-insensitive mode
val caseInsensitiveParquetFieldMap =
parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
structType.map { f =>
caseInsensitiveParquetFieldMap
.get(f.name.toLowerCase(Locale.ROOT))
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
s"$parquetTypesString in case-insensitive mode")
} else {
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
}
}.getOrElse(toParquet.convertField(f))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,49 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}

test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") {
withTempDir { dir =>
val format = "parquet"
val tableDir = dir.getCanonicalPath + s"/$format"
val tableName = s"spark_25132_${format}"
withTable(tableName) {
val end = 5
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
data.write.format(format).mode("overwrite").save(tableDir)
}
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but it makes me think that case-sensitivity should be a global or at least table level config, otherwise the behavior is a little confusing. cc @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table-level conf is reasonable. Let us do it in 3.0?


withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
checkAnswer(sql(s"select A from $tableName"), data.select("A"))

// RuntimeException is triggered at executor side, which is then wrapped as
// SparkException at driver side
val e1 = intercept[SparkException] {
sql(s"select b from $tableName").collect()
}
assert(
e1.getCause.isInstanceOf[RuntimeException] &&
e1.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
val e2 = intercept[SparkException] {
sql(s"select B from $tableName").collect()
}
assert(
e2.getCause.isInstanceOf[RuntimeException] &&
e2.getCause.getMessage.contains(
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
}
}
}
}
}

object TestingUDT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,19 +1014,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String): Unit = {
expectedSchema: String,
caseSensitive: Boolean = true): Unit = {
testSchemaClipping(testName, parquetSchema, catalystSchema,
MessageTypeParser.parseMessageType(expectedSchema))
MessageTypeParser.parseMessageType(expectedSchema), caseSensitive)
}

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType): Unit = {
expectedSchema: MessageType,
caseSensitive: Boolean): Unit = {
test(s"Clipping - $testName") {
val actual = ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive)

try {
expectedSchema.checkContains(actual)
Expand Down Expand Up @@ -1387,7 +1389,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {

catalystSchema = new StructType(),

expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE,
caseSensitive = true)

testSchemaClipping(
"disjoint field sets",
Expand Down Expand Up @@ -1544,4 +1547,52 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
| }
|}
""".stripMargin)

testSchemaClipping(
"case-insensitive resolution: no ambiguity",
parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,
catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
},
expectedSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,
caseSensitive = false)

test("Clipping - case-insensitive resolution: more than one field is matched") {
val parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
| optional int32 a;
|}
""".stripMargin
val catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
}
assertThrows[RuntimeException] {
ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false)
}
}
}