diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java index 7b141e3da5e0c..fa773d40f6efb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java @@ -54,9 +54,6 @@ public HoodieRecordType getRecordType() { } private Option combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { - if (newer.isDelete(schema, props)) { - return Option.empty(); - } Option previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData); if (!previousAvroData.isPresent()) { return Option.empty(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java index bfa312eb2dcfe..9f535f9492142 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java @@ -31,6 +31,7 @@ * Avro Merger that always chooses the newer record */ public class OverwriteWithLatestMerger implements HoodieRecordMerger { + public static final OverwriteWithLatestMerger INSTANCE = new OverwriteWithLatestMerger(); @Override public Option> merge(HoodieRecord older, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java index 601aceceb7658..0393a0ad1d602 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.OperationModeAwareness; +import org.apache.hudi.common.model.OverwriteWithLatestMerger; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -76,11 +77,14 @@ public static HoodieRecordMerger loadRecordMerger(String mergerClass) { */ public static HoodieRecordMerger createRecordMerger(String basePath, EngineType engineType, List mergerClassList, String recordMergerStrategy) { + HoodieRecordMerger defaultMerger = recordMergerStrategy.equals(HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID) + ? HoodieAvroRecordMerger.INSTANCE : OverwriteWithLatestMerger.INSTANCE; + if (mergerClassList.isEmpty() || HoodieTableMetadata.isMetadataTable(basePath)) { - return HoodieAvroRecordMerger.INSTANCE; + return defaultMerger; } else { return createValidRecordMerger(engineType, mergerClassList, recordMergerStrategy) - .orElse(HoodieAvroRecordMerger.INSTANCE); + .orElse(defaultMerger); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala index 2b55183f5b962..868e6cec270da 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala @@ -42,37 +42,36 @@ class TestCustomDeleteRecord extends SparkClientFunctionalTestHarness { @ParameterizedTest @MethodSource(Array("provideParams")) def testCustomDelete(useFgReader: String, - tableType: String, - recordType: String, - positionUsed: String, - mergeMode: String): Unit = { - val mergeClasses = List( + tableType: String, + recordType: String, + positionUsed: String, + mergeMode: String): Unit = { + val sparkMergeClasses = List( classOf[DefaultSparkRecordMerger].getName, - classOf[OverwriteWithLatestSparkRecordMerger].getName, + classOf[OverwriteWithLatestSparkRecordMerger].getName).mkString(",") + val avroMergerClasses = List( classOf[HoodieAvroRecordMerger].getName, - classOf[OverwriteWithLatestMerger].getName - ).mkString(",") + classOf[OverwriteWithLatestMerger].getName).mkString(",") + val mergeStrategy = if (mergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) { HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID } else { HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID } - val sparkOpts: Map[String, String] = Map( + val mergeOpts: Map[String, String] = Map( HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet", - HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> mergeClasses, + HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> + (if (recordType.equals("SPARK")) sparkMergeClasses else avroMergerClasses), HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key -> mergeStrategy) val fgReaderOpts: Map[String, String] = Map( HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader, HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed, - HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode) + HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode + ) val deleteOpts: Map[String, String] = Map( DELETE_KEY -> "delete", DELETE_MARKER -> "d") - val opts = if (recordType.equals("SPARK")) { - sparkOpts ++ fgReaderOpts ++ deleteOpts - } else { - fgReaderOpts ++ deleteOpts - } + val opts = mergeOpts ++ fgReaderOpts ++ deleteOpts val columns = Seq("ts", "key", "rider", "driver", "fare", "delete") val data = Seq( @@ -130,13 +129,22 @@ object TestCustomDeleteRecord { def provideParams(): java.util.List[Arguments] = { java.util.Arrays.asList( Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false", "EVENT_TIME_ORDERING"), - Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "EVENT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true", "EVENT_TIME_ORDERING"), Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false", "COMMIT_TIME_ORDERING"), - Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "COMMIT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true", "COMMIT_TIME_ORDERING"), Arguments.of("true", "MERGE_ON_READ", "AVRO", "false", "EVENT_TIME_ORDERING"), - Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "EVENT_TIME_ORDERING"), + Arguments.of("true", "MERGE_ON_READ", "AVRO", "true", "EVENT_TIME_ORDERING"), Arguments.of("true", "MERGE_ON_READ", "AVRO", "false", "COMMIT_TIME_ORDERING"), - Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "COMMIT_TIME_ORDERING")) + Arguments.of("true", "MERGE_ON_READ", "AVRO", "true", "COMMIT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false", "EVENT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "EVENT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false", "COMMIT_TIME_ORDERING"), + Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "COMMIT_TIME_ORDERING"), + Arguments.of("true", "MERGE_ON_READ", "SPARK", "false", "EVENT_TIME_ORDERING"), + Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "EVENT_TIME_ORDERING"), + Arguments.of("true", "MERGE_ON_READ", "SPARK", "false", "COMMIT_TIME_ORDERING"), + Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "COMMIT_TIME_ORDERING") + ) } def validate(expectedDf: Dataset[Row], actualDf: Dataset[Row]): Unit = {