From 65d5dd943df8705410dbc375bff34cb36266bf30 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Wed, 19 Feb 2025 17:49:45 +0100 Subject: [PATCH] Disallow enabling column mapping if invalid column mapping metadata is already present (#4167) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description As effect of earlier bugs (e.g. fixed in https://github.com/delta-io/delta/pull/3487) there can exists tables where column mapping is disabled, but there is column mapping metadata on the table. Enabling column mapping metadata on such a table could lead to unexpected corruption. Simply stripping such metadata could also lead to curruptions, as the invalid metadata can be already used in other places (e.g. column statistics) via DeltaColumnMapping.getPhysicalName, which returns the name from the metadata even when column mapping is disabled. After https://github.com/delta-io/delta/pull/3688 it should no longer be possible to end up with tables having such invalid metadata, so the issue only concerns existing tables created before that fix. To avoid corruption, we want to disallow enabling column mapping on such tables. ## How was this patch tested? Added tests to DeltaColumnMappingSuite. ## Does this PR introduce _any_ user-facing changes? No. We are disallowing an operation on tables that would lead to Delta table corruption on tables that are already in an invalid state entering which is fixed already, so it can only concern old tables in the wild. --------- Co-authored-by: Julek Sompolski --- .../resources/error/delta-error-classes.json | 7 +++ .../spark/sql/delta/DeltaColumnMapping.scala | 12 +++++ .../apache/spark/sql/delta/DeltaErrors.scala | 6 +++ .../sql/delta/sources/DeltaSQLConf.scala | 12 +++++ .../sql/delta/DeltaColumnMappingSuite.scala | 45 +++++++++++++++++++ 5 files changed, 82 insertions(+) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index b40bdb3954e..041f274740a 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -869,6 +869,13 @@ ], "sqlState" : "42K03" }, + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS" : { + "message" : [ + "Enabling column mapping when column mapping metadata is already present in schema is not supported.", + "To use column mapping, create a new table and reload the data into it." + ], + "sqlState" : "XXKDS" + }, "DELTA_EXCEED_CHAR_VARCHAR_LIMIT" : { "message" : [ "Value \"\" exceeds char/varchar type length limitation. Failed check: ." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index b893d25f3cf..cf3b9493599 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -174,6 +174,18 @@ trait DeltaColumnMappingBase extends DeltaLogging { } } + // If column mapping was disabled, but there was already column mapping in the schema, it is + // a result of a bug in the previous version of Delta. This should no longer happen with the + // stripping done above. For existing tables with this issue, we should not allow enabling + // column mapping, to prevent further corruption. + if (spark.conf.get(DeltaSQLConf. + DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS)) { + if (oldMappingMode == NoMapping && newMappingMode != NoMapping && + schemaHasColumnMappingMetadata(oldMetadata.schema)) { + throw DeltaErrors.enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists() + } + } + updatedMetadata = updateColumnMappingMetadata( oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 386d66caa60..65f15dc251f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2126,6 +2126,12 @@ trait DeltaErrorsBase messageParameters = Array(oldMode, newMode)) } + def enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists(): Throwable = { + new DeltaColumnMappingUnsupportedException( + errorClass = + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS") + } + def generateManifestWithColumnMappingNotSupported: Throwable = { new DeltaColumnMappingUnsupportedException( errorClass = "DELTA_UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index ab8b8d6cb43..25b5eac3866 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1953,6 +1953,18 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS = + buildConf("columnMapping.disallowEnablingWhenColumnMappingMetadataAlreadyExists") + .doc( + """ + |If Delta table already has column mapping metadata before the feature is enabled, it is + |as a result of a corruption or a bug. Enabling column mapping in such a case can lead to + |further corruption of the table and should be disallowed. + |""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index 6504c8943e6..411a7c2a7dc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -2106,4 +2106,49 @@ class DeltaColumnMappingSuite extends QueryTest s"Supported modes are: $supportedModes")) } } + + test("enabling column mapping disallowed if column mapping metadata already exists") { + withSQLConf( + // enabling this fixes the issue of committing invalid metadata in the first place + DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false" + ) { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, path) + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val schema = + new StructType().add("id", IntegerType, true, withIdAndPhysicalName(0, "col-0")) + val metadata = actions.Metadata( + name = "test_table", + schemaString = schema.json, + configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name) + ) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, DeltaOperations.ManualUpdate) + + // Enabling the config will disallow enabling column mapping. + withSQLConf(DeltaSQLConf + .DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key + -> "true") { + val e = intercept[DeltaColumnMappingUnsupportedException] { + alterTableWithProps( + s"delta.`$path`", + Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name)) + } + assert(e.getErrorClass == + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS") + } + + // Disabling the config will allow enabling column mapping. + withSQLConf(DeltaSQLConf + .DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key + -> "false") { + alterTableWithProps( + s"delta.`$path`", + Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name)) + } + } + } + } + } }