Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang committed Jun 21, 2024
1 parent fadaaa9 commit b413f75
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>

private static final long serialVersionUID = 1L;
private static final long INITIAL_CHECKPOINT_ID = -1L;
private static final long END_INPUT_CHECKPOINT_ID = Long.MAX_VALUE;
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];

private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
Expand Down Expand Up @@ -446,7 +447,7 @@ public void processElement(StreamRecord<FlinkWriteResult> element) {
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
long currentCheckpointId = Long.MAX_VALUE;
long currentCheckpointId = END_INPUT_CHECKPOINT_ID;
dataFilesPerCheckpoint.put(currentCheckpointId, writeToManifest(currentCheckpointId));
writeResultsSinceLastSnapshot.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
Expand All @@ -83,13 +82,6 @@ private SimpleDataUtil() {}
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get()));

public static final Schema SCHEMA_WITH_PRIMARY_KEY =
new Schema(
Lists.newArrayList(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get())),
Sets.newHashSet(1));

public static final TableSchema FLINK_SCHEMA =
TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build();

Expand Down Expand Up @@ -318,10 +310,6 @@ public static void assertTableRecords(Table table, List<Record> expected, String
StructLikeSet actualSet = StructLikeSet.create(type);

for (Record record : iterable) {
if (!table.schema().identifierFieldNames().isEmpty()) {
Assert.assertFalse("Should not have the identical record", actualSet.contains(record));
}

actualSet.add(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TestBase;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -93,21 +92,15 @@ public class TestIcebergFilesCommitter extends TestBase {
@Parameter(index = 2)
private String branch;

@Parameter(index = 3)
private boolean hasPrimaryKey;

@Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}, hasPrimaryKey = {3}")
@Parameters(name = "formatVersion = {0}, fileFormat = {1}, branch = {2}")
protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {1, FileFormat.AVRO, "main", false},
new Object[] {2, FileFormat.AVRO, "test-branch", false},
new Object[] {1, FileFormat.PARQUET, "main", false},
new Object[] {2, FileFormat.PARQUET, "test-branch", false},
new Object[] {1, FileFormat.ORC, "main", false},
new Object[] {2, FileFormat.ORC, "test-branch", false},
new Object[] {2, FileFormat.AVRO, "main", true},
new Object[] {2, FileFormat.PARQUET, "test-branch", true},
new Object[] {2, FileFormat.ORC, "main", true});
new Object[] {1, FileFormat.AVRO, "main"},
new Object[] {2, FileFormat.AVRO, "test-branch"},
new Object[] {1, FileFormat.PARQUET, "main"},
new Object[] {2, FileFormat.PARQUET, "test-branch"},
new Object[] {1, FileFormat.ORC, "main"},
new Object[] {2, FileFormat.ORC, "test-branch"});
}

@Override
Expand All @@ -119,9 +112,8 @@ public void setupTable() throws IOException {
this.metadataDir = new File(tableDir, "metadata");
assertThat(tableDir.delete()).isTrue();

Schema schema = hasPrimaryKey ? SimpleDataUtil.SCHEMA_WITH_PRIMARY_KEY : SimpleDataUtil.SCHEMA;
// Construct the iceberg table.
table = create(schema, PartitionSpec.unpartitioned());
table = create(SimpleDataUtil.SCHEMA, PartitionSpec.unpartitioned());

table
.updateProperties()
Expand Down Expand Up @@ -506,7 +498,6 @@ public void testRecoveryFromSnapshotWithoutCompletedNotification() throws Except

@TestTemplate
public void testStartAnotherJobToWriteSameTable() throws Exception {
assumeThat(hasPrimaryKey).as("The test case only for non-primary table.").isEqualTo(false);

long checkpointId = 0;
long timestamp = 0;
Expand Down Expand Up @@ -922,19 +913,34 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception {
}

@TestTemplate
public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {
public void testCommitMultipleCheckpointsForV2Table() throws Exception {
// The current test needs to meet the following conditions:
// 1. The first checkpoint is not executed normally, and the next snapshot submits a single
// snapshot normally. That is, prepareSnapshotPreBarrier is triggered twice, but snapshotState()
// is only triggered once.
// 2. The data with the same primary key is required in both checkpoints, and the data file
// and eq-delete file are generated.

assumeThat(formatVersion)
.as("Only support equality-delete in format v2 or later.")
.isGreaterThan(1);

assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true);

long timestamp = 0;
long checkpoint = 10;

JobID jobId = new JobID();
OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory();

FileAppenderFactory<RowData> appenderFactory =
new FlinkAppenderFactory(
table,
table.schema(),
FlinkSchemaUtil.convert(table.schema()),
table.properties(),
table.spec(),
new int[] {table.schema().findField("id").fieldId()},
table.schema(),
null);

try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
createStreamSink(jobId)) {
Expand All @@ -944,9 +950,11 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {

assertMaxCommittedCheckpointId(jobId, operatorId, -1L);

RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
RowData insert1 = null;
RowData insert2 = null;
for (int i = 1; i <= 3; i++) {
insert1 = SimpleDataUtil.createInsert(1, "aaa" + i);
insert2 = SimpleDataUtil.createInsert(2, "bbb" + i);
DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2));
DeleteFile deleteFile =
writeEqDeleteFile(
Expand All @@ -958,10 +966,7 @@ public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception {
++timestamp);
}

// The 1th snapshotState.
harness.snapshot(checkpoint, ++timestamp);

// Notify the 1th snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch);
assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
Expand Down

0 comments on commit b413f75

Please sign in to comment.