Skip to content

Commit

Permalink
Fix the logic of merger selection
Browse files Browse the repository at this point in the history
  • Loading branch information
linliu-code committed Jan 8, 2025
1 parent 106a00d commit 52016b4
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ public HoodieRecordType getRecordType() {
}

private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
if (newer.isDelete(schema, props)) {
return Option.empty();
}
Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, props).map(HoodieAvroIndexedRecord::getData);
if (!previousAvroData.isPresent()) {
return Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Avro Merger that always chooses the newer record
*/
public class OverwriteWithLatestMerger implements HoodieRecordMerger {
public static final OverwriteWithLatestMerger INSTANCE = new OverwriteWithLatestMerger();

@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OperationModeAwareness;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;

Expand Down Expand Up @@ -76,11 +77,14 @@ public static HoodieRecordMerger loadRecordMerger(String mergerClass) {
*/
public static HoodieRecordMerger createRecordMerger(String basePath, EngineType engineType,
List<String> mergerClassList, String recordMergerStrategy) {
HoodieRecordMerger defaultMerger = recordMergerStrategy.equals(HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID)
? HoodieAvroRecordMerger.INSTANCE : OverwriteWithLatestMerger.INSTANCE;

if (mergerClassList.isEmpty() || HoodieTableMetadata.isMetadataTable(basePath)) {
return HoodieAvroRecordMerger.INSTANCE;
return defaultMerger;
} else {
return createValidRecordMerger(engineType, mergerClassList, recordMergerStrategy)
.orElse(HoodieAvroRecordMerger.INSTANCE);
.orElse(defaultMerger);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,37 +42,36 @@ class TestCustomDeleteRecord extends SparkClientFunctionalTestHarness {
@ParameterizedTest
@MethodSource(Array("provideParams"))
def testCustomDelete(useFgReader: String,
tableType: String,
recordType: String,
positionUsed: String,
mergeMode: String): Unit = {
val mergeClasses = List(
tableType: String,
recordType: String,
positionUsed: String,
mergeMode: String): Unit = {
val sparkMergeClasses = List(
classOf[DefaultSparkRecordMerger].getName,
classOf[OverwriteWithLatestSparkRecordMerger].getName,
classOf[OverwriteWithLatestSparkRecordMerger].getName).mkString(",")
val avroMergerClasses = List(
classOf[HoodieAvroRecordMerger].getName,
classOf[OverwriteWithLatestMerger].getName
).mkString(",")
classOf[OverwriteWithLatestMerger].getName).mkString(",")

val mergeStrategy = if (mergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) {
HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID
} else {
HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
}
val sparkOpts: Map[String, String] = Map(
val mergeOpts: Map[String, String] = Map(
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key -> mergeClasses,
HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key ->
(if (recordType.equals("SPARK")) sparkMergeClasses else avroMergerClasses),
HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key -> mergeStrategy)
val fgReaderOpts: Map[String, String] = Map(
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader,
HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed,
HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode)
HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode
)
val deleteOpts: Map[String, String] = Map(
DELETE_KEY -> "delete",
DELETE_MARKER -> "d")
val opts = if (recordType.equals("SPARK")) {
sparkOpts ++ fgReaderOpts ++ deleteOpts
} else {
fgReaderOpts ++ deleteOpts
}
val opts = mergeOpts ++ fgReaderOpts ++ deleteOpts
val columns = Seq("ts", "key", "rider", "driver", "fare", "delete")

val data = Seq(
Expand Down Expand Up @@ -130,13 +129,22 @@ object TestCustomDeleteRecord {
def provideParams(): java.util.List[Arguments] = {
java.util.Arrays.asList(
Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false", "EVENT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "AVRO", "false", "EVENT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "AVRO", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "AVRO", "false", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "COMMIT_TIME_ORDERING"))
Arguments.of("true", "MERGE_ON_READ", "AVRO", "true", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false", "EVENT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "false", "EVENT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "EVENT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "false", "COMMIT_TIME_ORDERING"),
Arguments.of("true", "MERGE_ON_READ", "SPARK", "true", "COMMIT_TIME_ORDERING")
)
}

def validate(expectedDf: Dataset[Row], actualDf: Dataset[Row]): Unit = {
Expand Down

0 comments on commit 52016b4

Please sign in to comment.