diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 307e1008e4c..f9ec02bf1a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -291,7 +291,7 @@ trait MergeIntoCommandBase extends LeafRunnableCommand fileIndex: TahoeFileIndex, columnsToDrop: Seq[String]): LogicalPlan = { - val targetOutputCols = getTargetOutputCols(deltaTxn) + val targetOutputCols = getTargetOutputCols(spark, deltaTxn) val plan = { @@ -339,8 +339,16 @@ trait MergeIntoCommandBase extends LeafRunnableCommand * this transaction, since new columns will have a value of null for all existing rows. */ protected def getTargetOutputCols( - txn: OptimisticTransaction, makeNullable: Boolean = false): Seq[NamedExpression] = { - txn.metadata.schema.map { col => + spark: SparkSession, + txn: OptimisticTransaction, + makeNullable: Boolean = false) + : Seq[NamedExpression] = { + // Internal metadata attached to the table schema must not leak into the the target plan to + // prevent inconsistencies - e.p. metadata matters when comparing data type of struct with + // nested fields. + val schema = DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalMetadata(spark, txn.metadata.schema)) + schema.map { col => targetOutputAttributesMap .get(col.name) .map { a => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala index dfdd4d76af5..07367016962 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala @@ -388,7 +388,7 @@ trait ClassicMergeExecutor extends MergeOutputGeneration { // The target output columns need to be marked as nullable here, as they are going to be used // to reference the output of an outer join. - val targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) + val targetOutputCols = getTargetOutputCols(spark, deltaTxn, makeNullable = true) // If there are N columns in the target table, the full outer join output will have: // - N columns for target table diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala index 7eb6a49345d..6f83d446c5e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala @@ -96,7 +96,7 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration { sourceDF } - val outputDF = generateInsertsOnlyOutputDF(preparedSourceDF, deltaTxn) + val outputDF = generateInsertsOnlyOutputDF(spark, preparedSourceDF, deltaTxn) logDebug(s"$extraOpType: output plan:\n" + outputDF.queryExecution) val newFiles = writeFiles(spark, deltaTxn, outputDF) @@ -142,10 +142,11 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration { * and when there are multiple insert clauses. */ private def generateInsertsOnlyOutputDF( + spark: SparkSession, preparedSourceDF: DataFrame, deltaTxn: OptimisticTransaction): DataFrame = { - val targetOutputColNames = getTargetOutputCols(deltaTxn).map(_.name) + val targetOutputColNames = getTargetOutputCols(spark, deltaTxn).map(_.name) // When there is only one insert clause, there is no need for ROW_DROPPED_COL and // output df can be generated without CaseWhen. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index cdead65864e..f5ca606cbeb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -3154,6 +3154,37 @@ abstract class MergeIntoSuiteBase expectedErrorMsgForDataSetTempView = null ) + test("merge correctly handle field metadata") { + withTable("source", "target") { + // Create a target table with user metadata (comments) and internal metadata (column mapping + // information) on both a top-level column and a nested field. + sql( + """ + |CREATE TABLE target( + | key int not null COMMENT 'data column', + | value int not null, + | cstruct struct) + |USING DELTA + |TBLPROPERTIES ( + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5', + | 'delta.columnMapping.mode' = 'name') + """.stripMargin + ) + sql(s"INSERT INTO target VALUES (0, 0, null)") + + sql("CREATE TABLE source (key int not null, value int not null) USING DELTA") + sql(s"INSERT INTO source VALUES (1, 1)") + + executeMerge( + tgt = "target", + src = "source", + cond = "source.key = target.key", + update(condition = "target.key = 1", set = "target.value = 42"), + updateNotMatched(condition = "target.key = 100", set = "target.value = 22")) + } + } + test("UDT Data Types - simple and nested") { withTable("source") { withTable("target") {