diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 34533b864919..56c78af7a94c 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -67,6 +67,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator private static final long serialVersionUID = 1L; private static final long INITIAL_CHECKPOINT_ID = -1L; + private static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE; private static final byte[] EMPTY_MANIFEST_DATA = new byte[0]; private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class); @@ -446,7 +447,7 @@ public void processElement(StreamRecord element) { @Override public void endInput() throws IOException { // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly. - long currentCheckpointId = Long.MAX_VALUE; + long currentCheckpointId = END_INPUT_CHECKPOINT_ID; dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId)); writeResultsSinceLastSnapshot.clear(); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index 055fbae6b2be..ce6caca12158 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -66,7 +66,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.StructLikeSet; @@ -83,13 +82,6 @@ private SimpleDataUtil() {} Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); - public static final Schema SCHEMA_WITH_PRIMARY_KEY = - new Schema( - Lists.newArrayList( - Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional(2, "data", Types.StringType.get())), - Sets.newHashSet(1)); - public static final TableSchema FLINK_SCHEMA = TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build(); @@ -318,10 +310,6 @@ public static void assertTableRecords(Table table, List expected, String StructLikeSet actualSet = StructLikeSet.create(type); for (Record record : iterable) { - if (!table.schema().identifierFieldNames().isEmpty()) { - Assert.assertFalse("Should not have the identical record", actualSet.contains(record)); - } - actualSet.add(record); } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 1bce296da2df..f5792675a256 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -63,7 +63,6 @@ import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.TestBase; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -93,21 +92,15 @@ public class TestIcebergFilesCommitter extends TestBase { @Parameter(index = 2) private String branch; - @Parameter(index = 3) - private boolean hasPrimaryKey; - - @Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}, hasPrimaryKey = {3}") + @Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}") protected static List parameters() { return Arrays.asList( - new Object[] {1, FileFormat.AVRO, "main", false}, - new Object[] {2, FileFormat.AVRO, "test-branch", false}, - new Object[] {1, FileFormat.PARQUET, "main", false}, - new Object[] {2, FileFormat.PARQUET, "test-branch", false}, - new Object[] {1, FileFormat.ORC, "main", false}, - new Object[] {2, FileFormat.ORC, "test-branch", false}, - new Object[] {2, FileFormat.AVRO, "main", true}, - new Object[] {2, FileFormat.PARQUET, "test-branch", true}, - new Object[] {2, FileFormat.ORC, "main", true}); + new Object[] {1, FileFormat.AVRO, "main"}, + new Object[] {2, FileFormat.AVRO, "test-branch"}, + new Object[] {1, FileFormat.PARQUET, "main"}, + new Object[] {2, FileFormat.PARQUET, "test-branch"}, + new Object[] {1, FileFormat.ORC, "main"}, + new Object[] {2, FileFormat.ORC, "test-branch"}); } @Override @@ -119,9 +112,8 @@ public void setupTable() throws IOException { this.metadataDir = new File(tableDir, "metadata"); assertThat(tableDir.delete()).isTrue(); - Schema schema = hasPrimaryKey ? SimpleDataUtil.SCHEMA_WITH_PRIMARY_KEY : SimpleDataUtil.SCHEMA; // Construct the iceberg table. - table = create(schema, PartitionSpec.unpartitioned()); + table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned()); table .updateProperties() @@ -506,7 +498,6 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except @TestTemplate public void testStartAnotherJobToWriteSameTable() throws Exception { - assumeThat(hasPrimaryKey).as("The test case only for non-primary table.").isEqualTo(false); long checkpointId = 0; long timestamp = 0; @@ -922,19 +913,34 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } @TestTemplate - public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + public void testCommitMultipleCheckpointsForV2Table() throws Exception { + // The current test needs to meet the following conditions: + // 1. The first checkpoint is not executed normally, and the next snapshot submits a single + // snapshot normally. That is, prepareSnapshotPreBarrier is triggered twice, but snapshotState() + // is only triggered once. + // 2. The data with the same primary key is required in both checkpoints, and the data file + // and eq-delete file are generated. + assumeThat(formatVersion) .as("Only support equality-delete in format v2 or later.") .isGreaterThan(1); - assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); - long timestamp = 0; long checkpoint = 10; JobID jobId = new JobID(); OperatorID operatorId; - FileAppenderFactory appenderFactory = createDeletableAppenderFactory(); + + FileAppenderFactory appenderFactory = + new FlinkAppenderFactory( + table, + table.schema(), + FlinkSchemaUtil.convert(table.schema()), + table.properties(), + table.spec(), + new int[] {table.schema().findField("id").fieldId()}, + table.schema(), + null); try (OneInputStreamOperatorTestHarness harness = createStreamSink(jobId)) { @@ -944,9 +950,11 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { assertMaxCommittedCheckpointId(jobId, operatorId, -1L); - RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); - RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + RowData insert1 = null; + RowData insert2 = null; for (int i = 1; i <= 3; i++) { + insert1 = SimpleDataUtil.createInsert(1, "aaa" + i); + insert2 = SimpleDataUtil.createInsert(2, "bbb" + i); DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); DeleteFile deleteFile = writeEqDeleteFile( @@ -958,10 +966,7 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { ++timestamp); } - // The 1th snapshotState. harness.snapshot(checkpoint, ++timestamp); - - // Notify the 1th snapshot to complete. harness.notifyOfCompletedCheckpoint(checkpoint); SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);