From 8db9617b59c9af76953f0c4efe20c89ee8fcd938 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Thu, 8 Feb 2024 12:13:15 +0100 Subject: [PATCH] Fix inconsistent field metadata in MERGE -Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) Fixes an issue where internal metadata attached to fields leaks into the plan constructed when executing a MERGE command, causing the same attribute to appear both with and without the internal metadata. This can cause plan validation to fail due to the same attribute having two apparently different data types (metadata is part of a field datatype). Added test that would fail without the fix Closes delta-io/delta#2612 GitOrigin-RevId: 50688200c53f9450512b76ee2d375b2e55db8216 --- .../delta/commands/MergeIntoCommandBase.scala | 14 +++++++-- .../commands/merge/ClassicMergeExecutor.scala | 2 +- .../merge/InsertOnlyMergeExecutor.scala | 5 +-- .../spark/sql/delta/MergeIntoSuiteBase.scala | 31 +++++++++++++++++++ 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala index 307e1008e4c..f9ec02bf1a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommandBase.scala @@ -291,7 +291,7 @@ trait MergeIntoCommandBase extends LeafRunnableCommand fileIndex: TahoeFileIndex, columnsToDrop: Seq[String]): LogicalPlan = { - val targetOutputCols = getTargetOutputCols(deltaTxn) + val targetOutputCols = getTargetOutputCols(spark, deltaTxn) val plan = { @@ -339,8 +339,16 @@ trait MergeIntoCommandBase extends LeafRunnableCommand * this transaction, since new columns will have a value of null for all existing rows. */ protected def getTargetOutputCols( - txn: OptimisticTransaction, makeNullable: Boolean = false): Seq[NamedExpression] = { - txn.metadata.schema.map { col => + spark: SparkSession, + txn: OptimisticTransaction, + makeNullable: Boolean = false) + : Seq[NamedExpression] = { + // Internal metadata attached to the table schema must not leak into the the target plan to + // prevent inconsistencies - e.p. metadata matters when comparing data type of struct with + // nested fields. + val schema = DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalMetadata(spark, txn.metadata.schema)) + schema.map { col => targetOutputAttributesMap .get(col.name) .map { a => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala index dfdd4d76af5..07367016962 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala @@ -388,7 +388,7 @@ trait ClassicMergeExecutor extends MergeOutputGeneration { // The target output columns need to be marked as nullable here, as they are going to be used // to reference the output of an outer join. - val targetOutputCols = getTargetOutputCols(deltaTxn, makeNullable = true) + val targetOutputCols = getTargetOutputCols(spark, deltaTxn, makeNullable = true) // If there are N columns in the target table, the full outer join output will have: // - N columns for target table diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala index 7eb6a49345d..6f83d446c5e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/InsertOnlyMergeExecutor.scala @@ -96,7 +96,7 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration { sourceDF } - val outputDF = generateInsertsOnlyOutputDF(preparedSourceDF, deltaTxn) + val outputDF = generateInsertsOnlyOutputDF(spark, preparedSourceDF, deltaTxn) logDebug(s"$extraOpType: output plan:\n" + outputDF.queryExecution) val newFiles = writeFiles(spark, deltaTxn, outputDF) @@ -142,10 +142,11 @@ trait InsertOnlyMergeExecutor extends MergeOutputGeneration { * and when there are multiple insert clauses. */ private def generateInsertsOnlyOutputDF( + spark: SparkSession, preparedSourceDF: DataFrame, deltaTxn: OptimisticTransaction): DataFrame = { - val targetOutputColNames = getTargetOutputCols(deltaTxn).map(_.name) + val targetOutputColNames = getTargetOutputCols(spark, deltaTxn).map(_.name) // When there is only one insert clause, there is no need for ROW_DROPPED_COL and // output df can be generated without CaseWhen. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index cdead65864e..f5ca606cbeb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -3154,6 +3154,37 @@ abstract class MergeIntoSuiteBase expectedErrorMsgForDataSetTempView = null ) + test("merge correctly handle field metadata") { + withTable("source", "target") { + // Create a target table with user metadata (comments) and internal metadata (column mapping + // information) on both a top-level column and a nested field. + sql( + """ + |CREATE TABLE target( + | key int not null COMMENT 'data column', + | value int not null, + | cstruct struct) + |USING DELTA + |TBLPROPERTIES ( + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5', + | 'delta.columnMapping.mode' = 'name') + """.stripMargin + ) + sql(s"INSERT INTO target VALUES (0, 0, null)") + + sql("CREATE TABLE source (key int not null, value int not null) USING DELTA") + sql(s"INSERT INTO source VALUES (1, 1)") + + executeMerge( + tgt = "target", + src = "source", + cond = "source.key = target.key", + update(condition = "target.key = 1", set = "target.value = 42"), + updateNotMatched(condition = "target.key = 100", set = "target.value = 22")) + } + } + test("UDT Data Types - simple and nested") { withTable("source") { withTable("target") {