diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java new file mode 100644 index 000000000000..a5f24e09a60b --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.expressions.Expressions.bucket; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Test range distribution with bucketing partition column. Compared to hash distribution, range + * distribution is more general to handle bucketing column while achieving even distribution of + * traffic to writer tasks. + * + * + */ +@Timeout(value = 30) +public class TestFlinkIcebergSinkRangeDistributionBucketing { + private static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG = + new Configuration() + // disable classloader check as Avro may cache class/object in the serializers. + .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + + // max supported parallelism is 16 (= 4 x 4) + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(4) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); + + private static final int NUM_BUCKETS = 4; + private static final int NUM_OF_CHECKPOINTS = 4; + private static final int ROW_COUNT_PER_CHECKPOINT = 200; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(2, "uuid", Types.UUIDType.get()), + Types.NestedField.optional(3, "data", Types.StringType.get())); + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).hour("ts").bucket("uuid", NUM_BUCKETS).build(); + private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA); + + private TableLoader tableLoader; + private Table table; + + @BeforeEach + public void before() throws IOException { + this.tableLoader = CATALOG_EXTENSION.tableLoader(); + this.table = + CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SCHEMA, + SPEC, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name())); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + // Assuming ts is on ingestion/processing time. Writer only writes to 1 or 2 hours concurrently. + // Only sort on the bucket column to avoid each writer task writes to 60 buckets/files + // concurrently. + table.replaceSortOrder().asc(bucket("uuid", NUM_BUCKETS)).commit(); + } + + @AfterEach + public void after() throws Exception { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + + /** number of buckets 4 matches writer parallelism of 4 */ + @Test + public void testBucketNumberEqualsToWriterParallelism() throws Exception { + testParallelism(4); + } + + /** number of buckets 4 is less than writer parallelism of 6 */ + @Test + public void testBucketNumberLessThanWriterParallelismNotDivisible() throws Exception { + testParallelism(6); + } + + /** number of buckets 4 is less than writer parallelism of 8 */ + @Test + public void testBucketNumberLessThanWriterParallelismDivisible() throws Exception { + testParallelism(8); + } + + /** number of buckets 4 is greater than writer parallelism of 3 */ + @Test + public void testBucketNumberHigherThanWriterParallelismNotDivisible() throws Exception { + testParallelism(3); + } + + /** number of buckets 4 is greater than writer parallelism of 2 */ + @Test + public void testBucketNumberHigherThanWriterParallelismDivisible() throws Exception { + testParallelism(2); + } + + private void testParallelism(int parallelism) throws Exception { + try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism)) { + + DataGeneratorSource generatorSource = + new DataGeneratorSource<>( + new RowGenerator(), + ROW_COUNT_PER_CHECKPOINT * NUM_OF_CHECKPOINTS, + RateLimiterStrategy.perCheckpoint(ROW_COUNT_PER_CHECKPOINT), + FlinkCompatibilityUtil.toTypeInfo(ROW_TYPE)); + DataStream dataStream = + env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator"); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the oldest snapshot to the newest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Source rate limit per checkpoint cycle may not be super precise. + // There could be more checkpoint cycles and commits than planned. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(NUM_OF_CHECKPOINTS); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + assertThat(addedDataFiles) + .hasSizeLessThanOrEqualTo(maxAddedDataFilesPerCheckpoint(parallelism)); + } + } + } + + /** + * Traffic is not perfectly balanced across all buckets in the small sample size Range + * distribution of the bucket id may cross subtask boundary. Hence the number of committed data + * files per checkpoint maybe larger than writer parallelism or the number of buckets. But it + * should not be more than the sum of those two. Without range distribution, the number of data + * files per commit can be 4x of parallelism (as the number of buckets is 4). + */ + private int maxAddedDataFilesPerCheckpoint(int parallelism) { + return NUM_BUCKETS + parallelism; + } + + private static class RowGenerator implements GeneratorFunction { + // use constant timestamp so that all rows go to the same hourly partition + private final long ts = System.currentTimeMillis(); + + @Override + public RowData map(Long index) throws Exception { + // random uuid should result in relatively balanced distribution across buckets + UUID uuid = UUID.randomUUID(); + ByteBuffer uuidByteBuffer = ByteBuffer.allocate(16); + uuidByteBuffer.putLong(uuid.getMostSignificantBits()); + uuidByteBuffer.putLong(uuid.getLeastSignificantBits()); + return GenericRowData.of( + TimestampData.fromEpochMillis(ts), + uuidByteBuffer.array(), + StringData.fromString("row-" + index)); + } + } +} diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java new file mode 100644 index 000000000000..a5f24e09a60b --- /dev/null +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkRangeDistributionBucketing.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink; + +import static org.apache.iceberg.expressions.Expressions.bucket; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.MiniFlinkClusterExtension; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Test range distribution with bucketing partition column. Compared to hash distribution, range + * distribution is more general to handle bucketing column while achieving even distribution of + * traffic to writer tasks. + * + *
    + *
  • keyBy on low cardinality (e.g. + * 60) may not achieve balanced data distribution. + *
  • number of buckets (e.g. 60) is not divisible by the writer parallelism (e.g. 40). + *
  • number of buckets (e.g. 60) is smaller than the writer parallelism (e.g. 120). + *
+ */ +@Timeout(value = 30) +public class TestFlinkIcebergSinkRangeDistributionBucketing { + private static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG = + new Configuration() + // disable classloader check as Avro may cache class/object in the serializers. + .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + + // max supported parallelism is 16 (= 4 x 4) + @RegisterExtension + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(4) + .setNumberSlotsPerTaskManager(4) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + private static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); + + private static final int NUM_BUCKETS = 4; + private static final int NUM_OF_CHECKPOINTS = 4; + private static final int ROW_COUNT_PER_CHECKPOINT = 200; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.optional(1, "ts", Types.TimestampType.withoutZone()), + Types.NestedField.optional(2, "uuid", Types.UUIDType.get()), + Types.NestedField.optional(3, "data", Types.StringType.get())); + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).hour("ts").bucket("uuid", NUM_BUCKETS).build(); + private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA); + + private TableLoader tableLoader; + private Table table; + + @BeforeEach + public void before() throws IOException { + this.tableLoader = CATALOG_EXTENSION.tableLoader(); + this.table = + CATALOG_EXTENSION + .catalog() + .createTable( + TestFixtures.TABLE_IDENTIFIER, + SCHEMA, + SPEC, + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name())); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + // Assuming ts is on ingestion/processing time. Writer only writes to 1 or 2 hours concurrently. + // Only sort on the bucket column to avoid each writer task writes to 60 buckets/files + // concurrently. + table.replaceSortOrder().asc(bucket("uuid", NUM_BUCKETS)).commit(); + } + + @AfterEach + public void after() throws Exception { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + + /** number of buckets 4 matches writer parallelism of 4 */ + @Test + public void testBucketNumberEqualsToWriterParallelism() throws Exception { + testParallelism(4); + } + + /** number of buckets 4 is less than writer parallelism of 6 */ + @Test + public void testBucketNumberLessThanWriterParallelismNotDivisible() throws Exception { + testParallelism(6); + } + + /** number of buckets 4 is less than writer parallelism of 8 */ + @Test + public void testBucketNumberLessThanWriterParallelismDivisible() throws Exception { + testParallelism(8); + } + + /** number of buckets 4 is greater than writer parallelism of 3 */ + @Test + public void testBucketNumberHigherThanWriterParallelismNotDivisible() throws Exception { + testParallelism(3); + } + + /** number of buckets 4 is greater than writer parallelism of 2 */ + @Test + public void testBucketNumberHigherThanWriterParallelismDivisible() throws Exception { + testParallelism(2); + } + + private void testParallelism(int parallelism) throws Exception { + try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment( + MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG) + .enableCheckpointing(100) + .setParallelism(parallelism) + .setMaxParallelism(parallelism)) { + + DataGeneratorSource generatorSource = + new DataGeneratorSource<>( + new RowGenerator(), + ROW_COUNT_PER_CHECKPOINT * NUM_OF_CHECKPOINTS, + RateLimiterStrategy.perCheckpoint(ROW_COUNT_PER_CHECKPOINT), + FlinkCompatibilityUtil.toTypeInfo(ROW_TYPE)); + DataStream dataStream = + env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator"); + + FlinkSink.forRowData(dataStream) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism) + .append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the oldest snapshot to the newest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Source rate limit per checkpoint cycle may not be super precise. + // There could be more checkpoint cycles and commits than planned. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(NUM_OF_CHECKPOINTS); + + // It takes 2 checkpoint cycle for statistics collection and application + // of the globally aggregated statistics in the range partitioner. + // The last two checkpoints should have range shuffle applied + List rangePartitionedCycles = + snapshots.subList(snapshots.size() - 2, snapshots.size()); + + for (Snapshot snapshot : rangePartitionedCycles) { + List addedDataFiles = + Lists.newArrayList(snapshot.addedDataFiles(table.io()).iterator()); + assertThat(addedDataFiles) + .hasSizeLessThanOrEqualTo(maxAddedDataFilesPerCheckpoint(parallelism)); + } + } + } + + /** + * Traffic is not perfectly balanced across all buckets in the small sample size Range + * distribution of the bucket id may cross subtask boundary. Hence the number of committed data + * files per checkpoint maybe larger than writer parallelism or the number of buckets. But it + * should not be more than the sum of those two. Without range distribution, the number of data + * files per commit can be 4x of parallelism (as the number of buckets is 4). + */ + private int maxAddedDataFilesPerCheckpoint(int parallelism) { + return NUM_BUCKETS + parallelism; + } + + private static class RowGenerator implements GeneratorFunction { + // use constant timestamp so that all rows go to the same hourly partition + private final long ts = System.currentTimeMillis(); + + @Override + public RowData map(Long index) throws Exception { + // random uuid should result in relatively balanced distribution across buckets + UUID uuid = UUID.randomUUID(); + ByteBuffer uuidByteBuffer = ByteBuffer.allocate(16); + uuidByteBuffer.putLong(uuid.getMostSignificantBits()); + uuidByteBuffer.putLong(uuid.getLeastSignificantBits()); + return GenericRowData.of( + TimestampData.fromEpochMillis(ts), + uuidByteBuffer.array(), + StringData.fromString("row-" + index)); + } + } +}