Skip to content

Commit

Permalink
[SPARK-38094] Enable matching schema column names by field ids
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Field Id is a native field in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398)

After this PR, when the requested schema has field IDs, Parquet readers will first use the field ID to determine which Parquet columns to read if the field ID exists in Spark schema, before falling back to match using column names.

This PR supports:
- Vectorized reader
- parquet-mr reader

### Why are the changes needed?
It enables matching columns by field id for supported DWs like iceberg and Delta. Specifically, it enables easy conversion from Iceberg (which uses field ids by name) to Delta, and allows `id` mode for Delta [column mapping](https://docs.databricks.com/delta/delta-column-mapping.html)

### Does this PR introduce _any_ user-facing change?
This PR introduces three new configurations:

`spark.sql.parquet.fieldId.write.enabled`: If enabled, Spark will write out native field ids that are stored inside StructField's metadata as `parquet.field.id` to parquet files. This configuration is default to `true`.

`spark.sql.parquet.fieldId.read.enabled`: If enabled, Spark will attempt to read field ids in parquet files and utilize them for matching columns. This configuration is default to `false`, so Spark could maintain its existing behavior by default.

`spark.sql.parquet.fieldId.read.ignoreMissing`: if enabled, Spark will read parquet files that do not have any field ids, while attempting to match the columns by id in Spark schema;  nulls will be returned for spark columns without a match. This configuration is default to `false`, so Spark could alert the user in case field id matching is expected but parquet files do not have any ids.

### How was this patch tested?
Existing tests + new unit tests.

Closes #35385 from jackierwzhang/SPARK-38094-field-ids.

Authored-by: jackierwzhang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
jackierwzhang authored and cloud-fan committed Feb 18, 2022
1 parent e613f08 commit b5eae59
Show file tree
Hide file tree
Showing 13 changed files with 1,088 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ object QueryExecutionErrors {
""".stripMargin.replaceAll("\n", " "))
}

def foundDuplicateFieldInFieldIdLookupModeError(
requiredId: Int, matchedFields: String): Throwable = {
new RuntimeException(
s"""
|Found duplicate field(s) "$requiredId": $matchedFields
|in id mapping mode
""".stripMargin.replaceAll("\n", " "))
}

def failedToMergeIncompatibleSchemasError(
left: StructType, right: StructType, e: Throwable): Throwable = {
new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,33 @@ object SQLConf {
.intConf
.createWithDefault(4096)

val PARQUET_FIELD_ID_WRITE_ENABLED =
buildConf("spark.sql.parquet.fieldId.write.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
"Parquet writers will populate the field Id " +
"metadata (if present) in the Spark schema to the Parquet schema.")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val PARQUET_FIELD_ID_READ_ENABLED =
buildConf("spark.sql.parquet.fieldId.read.enabled")
.doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
"will use field IDs (if present) in the requested Spark schema to look up Parquet " +
"fields instead of using column names")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val IGNORE_MISSING_PARQUET_FIELD_ID =
buildConf("spark.sql.parquet.fieldId.read.ignoreMissing")
.doc("When the Parquet file doesn't have any field IDs but the " +
"Spark read schema is using field IDs to read, we will silently return nulls " +
"when this flag is enabled, or error otherwise.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
.doc("Sets the compression codec used when writing ORC files. If either `compression` or " +
"`orc.compress` is specified in the table-specific options/properties, the precedence " +
Expand Down Expand Up @@ -4253,6 +4280,12 @@ class SQLConf extends Serializable with Logging {

def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)

def parquetFieldIdReadEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_READ_ENABLED)

def parquetFieldIdWriteEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED)

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

/** ********************** SQLConf functionality methods ************ */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class ParquetFileFormat
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)

conf.set(
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sparkSession.sessionState.conf.parquetFieldIdWriteEnabled.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down
Loading

0 comments on commit b5eae59

Please sign in to comment.