Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38094] Enable matching schema column names by field ids #35385

Closed
wants to merge 14 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,15 @@ object QueryExecutionErrors {
""".stripMargin.replaceAll("\n", " "))
}

def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int, matchedFields: String): Throwable = {
new RuntimeException(
s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

def failedToMergeIncompatibleSchemasError(
left: StructType, right: StructType, e: Throwable): Throwable = {
new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,23 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val PARQUET_FIELD_ID_ENABLED =
buildConf("spark.sql.parquet.fieldId.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
" will use field IDs (if present) in the requested Spark schema to look up Parquet" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it work when there is a mixture of columns that have field id set and ones that don't?

Copy link
Contributor Author

@jackierwzhang jackierwzhang Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would try to match by id if id exists, otherwise, it would fall back to match by name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the code would use field ids if the flag is enabled, if the flag is disabled, the code would use names instead. My main concern is ambiguity resolution in schema.

Copy link
Contributor Author

@jackierwzhang jackierwzhang Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I meant even when this flag is enabled, my statement above still applies: the matching is a best-effort basis.

Disabling this flag will complete avoid reading and writing field ids.

" fields instead of using column names; Parquet writers will also populate the field Id" +
" metadata (if present) in the Spark schema to the Parquet schema.")
.booleanConf
.createWithDefault(true)

val IGNORE_MISSING_PARQUET_FIELD_ID =
buildConf("spark.sql.parquet.fieldId.ignoreMissing")
.doc("When the Parquet file does't have any field IDs but the" +
" Spark read schema is using field IDs to read, we will return silently return nulls" +
"when this flag is enabled, or error otherwise.")
.booleanConf
.createWithDefault(false)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand Down Expand Up @@ -4251,6 +4268,8 @@ class SQLConf extends Serializable with Logging {

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

def parquetFieldIdEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_ENABLED)

def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.IOException
import java.net.URI

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -354,6 +355,13 @@ class ParquetFileFormat
}
} else {
logDebug(s"Falling back to parquet-mr")

if (SQLConf.get.parquetFieldIdEnabled &&
ParquetUtils.hasFieldIds(requiredSchema)) {
throw new IOException("Parquet-mr reader does not support schema with field IDs." +
s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")
}

// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz,
Expand Down
Loading