diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index b40bdb3954e..041f274740a 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -869,6 +869,13 @@ ], "sqlState" : "42K03" }, + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS" : { + "message" : [ + "Enabling column mapping when column mapping metadata is already present in schema is not supported.", + "To use column mapping, create a new table and reload the data into it." + ], + "sqlState" : "XXKDS" + }, "DELTA_EXCEED_CHAR_VARCHAR_LIMIT" : { "message" : [ "Value \"\" exceeds char/varchar type length limitation. Failed check: ." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index b893d25f3cf..cf3b9493599 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -174,6 +174,18 @@ trait DeltaColumnMappingBase extends DeltaLogging { } } + // If column mapping was disabled, but there was already column mapping in the schema, it is + // a result of a bug in the previous version of Delta. This should no longer happen with the + // stripping done above. For existing tables with this issue, we should not allow enabling + // column mapping, to prevent further corruption. + if (spark.conf.get(DeltaSQLConf. + DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS)) { + if (oldMappingMode == NoMapping && newMappingMode != NoMapping && + schemaHasColumnMappingMetadata(oldMetadata.schema)) { + throw DeltaErrors.enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists() + } + } + updatedMetadata = updateColumnMappingMetadata( oldMetadata, updatedMetadata, isChangingModeOnExistingTable, isOverwriteSchema) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 386d66caa60..65f15dc251f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2126,6 +2126,12 @@ trait DeltaErrorsBase messageParameters = Array(oldMode, newMode)) } + def enablingColumnMappingDisallowedWhenColumnMappingMetadataAlreadyExists(): Throwable = { + new DeltaColumnMappingUnsupportedException( + errorClass = + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS") + } + def generateManifestWithColumnMappingNotSupported: Throwable = { new DeltaColumnMappingUnsupportedException( errorClass = "DELTA_UNSUPPORTED_MANIFEST_GENERATION_WITH_COLUMN_MAPPING") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index ab8b8d6cb43..25b5eac3866 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1953,6 +1953,18 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS = + buildConf("columnMapping.disallowEnablingWhenColumnMappingMetadataAlreadyExists") + .doc( + """ + |If Delta table already has column mapping metadata before the feature is enabled, it is + |as a result of a corruption or a bug. Enabling column mapping in such a case can lead to + |further corruption of the table and should be disallowed. + |""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index 6504c8943e6..411a7c2a7dc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -2106,4 +2106,49 @@ class DeltaColumnMappingSuite extends QueryTest s"Supported modes are: $supportedModes")) } } + + test("enabling column mapping disallowed if column mapping metadata already exists") { + withSQLConf( + // enabling this fixes the issue of committing invalid metadata in the first place + DeltaSQLConf.DELTA_COLUMN_MAPPING_STRIP_METADATA.key -> "false" + ) { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, path) + deltaLog.withNewTransaction(catalogTableOpt = None) { txn => + val schema = + new StructType().add("id", IntegerType, true, withIdAndPhysicalName(0, "col-0")) + val metadata = actions.Metadata( + name = "test_table", + schemaString = schema.json, + configuration = Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NoMapping.name) + ) + txn.updateMetadata(metadata) + txn.commit(Seq.empty, DeltaOperations.ManualUpdate) + + // Enabling the config will disallow enabling column mapping. + withSQLConf(DeltaSQLConf + .DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key + -> "true") { + val e = intercept[DeltaColumnMappingUnsupportedException] { + alterTableWithProps( + s"delta.`$path`", + Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name)) + } + assert(e.getErrorClass == + "DELTA_ENABLING_COLUMN_MAPPING_DISALLOWED_WHEN_COLUMN_MAPPING_METADATA_ALREADY_EXISTS") + } + + // Disabling the config will allow enabling column mapping. + withSQLConf(DeltaSQLConf + .DELTA_COLUMN_MAPPING_DISALLOW_ENABLING_WHEN_METADATA_ALREADY_EXISTS.key + -> "false") { + alterTableWithProps( + s"delta.`$path`", + Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> NameMapping.name)) + } + } + } + } + } }