diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 351ba54e5c7c..5718f4b93825 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; @@ -37,6 +39,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -74,6 +79,7 @@ import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.flink.source.split.SplitComparators; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ThreadPools; @@ -97,6 +103,11 @@ public class IcebergSource implements Source emitter; private final String tableName; + // cache the discovered splits by planSplitsForBatch, which can be called twice. And they come + // from two different threads: (1) source/stream construction by main thread (2) enumerator + // creation. Hence need volatile here. + private volatile List batchSplits; + IcebergSource( TableLoader tableLoader, ScanContext scanContext, @@ -132,16 +143,26 @@ private String planningThreadName() { return tableName + "-" + UUID.randomUUID(); } + /** + * Cache the enumerated splits for batch execution to avoid double planning as there are two code + * paths obtaining splits: (1) infer parallelism (2) enumerator creation. + */ private List planSplitsForBatch(String threadName) { + if (batchSplits != null) { + return batchSplits; + } + ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); try (TableLoader loader = tableLoader.clone()) { loader.open(); - List splits = + this.batchSplits = FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", splits.size(), tableName); - return splits; + "Discovered {} splits from table {} during job initialization", + batchSplits.size(), + tableName); + return batchSplits; } catch (IOException e) { throw new UncheckedIOException("Failed to close table loader", e); } finally { @@ -207,12 +228,35 @@ private SplitEnumerator createEnumer // Only do scan planning if nothing is restored from checkpoint state List splits = planSplitsForBatch(planningThreadName()); assigner.onDiscoveredSplits(splits); + // clear the cached splits after enumerator creation as they won't be needed anymore + this.batchSplits = null; } return new StaticIcebergEnumerator(enumContext, assigner); } } + private boolean shouldInferParallelism() { + return !scanContext.isStreaming(); + } + + private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) { + int parallelism = + SourceUtil.inferParallelism( + flinkConf, + scanContext.limit(), + () -> { + List splits = planSplitsForBatch(planningThreadName()); + return splits.size(); + }); + + if (env.getMaxParallelism() > 0) { + parallelism = Math.min(parallelism, env.getMaxParallelism()); + } + + return parallelism; + } + /** * Create a source builder. * @@ -571,6 +615,41 @@ public IcebergSource build() { emitter); } + /** + * Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark + * strategy is set to {@link WatermarkStrategy#noWatermarks()}. + * + * @return data stream from the Iceberg source + */ + public DataStream buildStream(StreamExecutionEnvironment env) { + // buildStream should only be called with RowData or Converter paths. + Preconditions.checkState( + readerFunction == null, + "Cannot set reader function when building a data stream from the source"); + IcebergSource source = build(); + TypeInformation outputTypeInfo = + outputTypeInfo(converter, table.schema(), source.scanContext.project()); + DataStreamSource stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo); + if (source.shouldInferParallelism()) { + stream = stream.setParallelism(source.inferParallelism(flinkConfig, env)); + } + + return stream; + } + + private static TypeInformation outputTypeInfo( + RowDataConverter converter, Schema tableSchema, Schema projected) { + if (converter != null) { + return converter.getProducedType(); + } else { + // output type is RowData + Schema readSchema = projected != null ? projected : tableSchema; + return (TypeInformation) + FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema)); + } + } + private ReaderFunction readerFunction(ScanContext context) { if (table instanceof BaseMetadataTable) { MetaDataReaderFunction rowDataReaderFunction = diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 610657e8d47b..65adce77d9f9 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -23,11 +23,8 @@ import java.util.Map; import java.util.Optional; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -128,26 +125,18 @@ private DataStream createDataStream(StreamExecutionEnvironment execEnv) .build(); } - private DataStreamSource createFLIP27Stream(StreamExecutionEnvironment env) { + private DataStream createFLIP27Stream(StreamExecutionEnvironment env) { SplitAssignerType assignerType = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE); - IcebergSource source = - IcebergSource.forRowData() - .tableLoader(loader) - .assignerFactory(assignerType.factory()) - .properties(properties) - .project(getProjectedSchema()) - .limit(limit) - .filters(filters) - .flinkConfig(readableConfig) - .build(); - DataStreamSource stream = - env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - source.name(), - TypeInformation.of(RowData.class)); - return stream; + return IcebergSource.forRowData() + .tableLoader(loader) + .assignerFactory(assignerType.factory()) + .properties(properties) + .project(getProjectedSchema()) + .limit(limit) + .filters(filters) + .flinkConfig(readableConfig) + .buildStream(env); } private TableSchema getProjectedSchema() { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java index b7447d15c05a..db8647f054ae 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java @@ -24,8 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -130,11 +128,8 @@ protected List run( sourceBuilder.properties(options); DataStream stream = - env.fromSource( - sourceBuilder.build(), - WatermarkStrategy.noWatermarks(), - "testBasicRead", - TypeInformation.of(RowData.class)) + sourceBuilder + .buildStream(env) .map( new RowDataToRowMapper( FlinkSchemaUtil.convert( diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java index 0f41c5af4c95..d3713e296014 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java @@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded { @BeforeEach public void before() throws IOException { Configuration tableConf = getTableEnv().getConfig().getConfiguration(); - tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); SqlHelpers.sql( getTableEnv(), "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java new file mode 100644 index 000000000000..2908cb927269 --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Path; +import java.util.List; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.data.RowDataToRowMapper; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +public class TestIcebergSourceInferParallelism { + private static final int NUM_TMS = 2; + private static final int SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + private static final int MAX_INFERRED_PARALLELISM = 3; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TM) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + protected static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); + + @TempDir private Path tmpDir; + + private Table table; + private GenericAppenderHelper dataAppender; + + @BeforeEach + public void before() throws IOException { + this.table = + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, tmpDir); + } + + @AfterEach + public void after() { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + + @Test + public void testEmptyTable() throws Exception { + // Inferred parallelism should be at least 1 even if table is empty + test(1, 0); + } + + @Test + public void testTableWithFilesLessThanMaxInferredParallelism() throws Exception { + // Append files to the table + for (int i = 0; i < 2; ++i) { + List batch = RandomGenericData.generate(table.schema(), 1, 0); + dataAppender.appendToTable(batch); + } + + // Inferred parallelism should equal to 2 splits + test(2, 2); + } + + @Test + public void testTableWithFilesMoreThanMaxInferredParallelism() throws Exception { + // Append files to the table + for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) { + List batch = RandomGenericData.generate(table.schema(), 1, 0); + dataAppender.appendToTable(batch); + } + + // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM + test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1); + } + + private void test(int expectedParallelism, int expectedRecords) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + Configuration config = new Configuration(); + config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, true); + config.set( + FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, + MAX_INFERRED_PARALLELISM); + + DataStream dataStream = + IcebergSource.forRowData() + .tableLoader(CATALOG_EXTENSION.tableLoader()) + .table(table) + .flinkConfig(config) + // force one file per split + .splitSize(1L) + .buildStream(env) + .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema()))); + + DataStream.Collector collector = new DataStream.Collector<>(); + dataStream.collectAsync(collector); + JobClient jobClient = env.executeAsync(); + try (CloseableIterator iterator = collector.getOutput()) { + List result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + + assertThat(result).hasSize(expectedRecords); + verifySourceParallelism( + expectedParallelism, miniCluster().getExecutionGraph(jobClient.getJobID()).get()); + } + } + + /** + * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to get source parallelism + * from execution graph. + */ + private static void verifySourceParallelism( + int expectedParallelism, AccessExecutionGraph executionGraph) { + AccessExecutionJobVertex sourceVertex = + executionGraph.getVerticesTopologically().iterator().next(); + assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism); + } + + /** + * Use reflection to get {@code InternalMiniClusterExtension} and {@code MiniCluster} to get + * execution graph and source parallelism. Haven't find other way via public APIS. + */ + private static MiniCluster miniCluster() throws Exception { + Field privateField = + MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension"); + privateField.setAccessible(true); + InternalMiniClusterExtension internalExtension = + (InternalMiniClusterExtension) privateField.get(MINI_CLUSTER_EXTENSION); + return internalExtension.getMiniCluster(); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 75f0a785a8c5..548940a842ce 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase { public void before() throws IOException { TableEnvironment tableEnvironment = getTableEnv(); Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); - tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); + // Disable inferring parallelism to avoid interfering watermark tests + // that check split assignment is ordered by the watermark column. + // The tests assumes default parallelism of 1 with single reader task + // in order to check the order of read records. + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1"); SqlHelpers.sql( diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index 51f9025b4159..564e8139e6cc 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -48,14 +48,20 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +/** + * There is a infinite sleep in the test. Add a timeout to the test to avoid stuck situation in case + * anything goes wrong unexpectedly. + */ +@Timeout(value = 60) public class TestIcebergSpeculativeExecutionSupport extends TestBase { private static final int NUM_TASK_MANAGERS = 1; private static final int NUM_TASK_SLOTS = 3; @RegisterExtension - public static MiniClusterExtension miniClusterResource = + public static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(NUM_TASK_MANAGERS) @@ -144,9 +150,9 @@ public void testSpeculativeExecution() throws Exception { private static class TestingMap extends RichMapFunction { @Override public Row map(Row row) throws Exception { - // Put the subtasks with the first attempt to sleep to trigger speculative - // execution - if (getRuntimeContext().getAttemptNumber() <= 0) { + // Simulate slow subtask 0 with attempt 0 + if (getRuntimeContext().getIndexOfThisSubtask() == 0 + && getRuntimeContext().getAttemptNumber() <= 0) { Thread.sleep(Integer.MAX_VALUE); } @@ -169,6 +175,7 @@ private static Configuration configure() { // Use FLIP-27 source configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); + configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); // for speculative execution configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 351ba54e5c7c..5718f4b93825 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.connector.source.SourceReader; @@ -37,6 +39,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -74,6 +79,7 @@ import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.flink.source.split.SplitComparators; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ThreadPools; @@ -97,6 +103,11 @@ public class IcebergSource implements Source emitter; private final String tableName; + // cache the discovered splits by planSplitsForBatch, which can be called twice. And they come + // from two different threads: (1) source/stream construction by main thread (2) enumerator + // creation. Hence need volatile here. + private volatile List batchSplits; + IcebergSource( TableLoader tableLoader, ScanContext scanContext, @@ -132,16 +143,26 @@ private String planningThreadName() { return tableName + "-" + UUID.randomUUID(); } + /** + * Cache the enumerated splits for batch execution to avoid double planning as there are two code + * paths obtaining splits: (1) infer parallelism (2) enumerator creation. + */ private List planSplitsForBatch(String threadName) { + if (batchSplits != null) { + return batchSplits; + } + ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); try (TableLoader loader = tableLoader.clone()) { loader.open(); - List splits = + this.batchSplits = FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", splits.size(), tableName); - return splits; + "Discovered {} splits from table {} during job initialization", + batchSplits.size(), + tableName); + return batchSplits; } catch (IOException e) { throw new UncheckedIOException("Failed to close table loader", e); } finally { @@ -207,12 +228,35 @@ private SplitEnumerator createEnumer // Only do scan planning if nothing is restored from checkpoint state List splits = planSplitsForBatch(planningThreadName()); assigner.onDiscoveredSplits(splits); + // clear the cached splits after enumerator creation as they won't be needed anymore + this.batchSplits = null; } return new StaticIcebergEnumerator(enumContext, assigner); } } + private boolean shouldInferParallelism() { + return !scanContext.isStreaming(); + } + + private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) { + int parallelism = + SourceUtil.inferParallelism( + flinkConf, + scanContext.limit(), + () -> { + List splits = planSplitsForBatch(planningThreadName()); + return splits.size(); + }); + + if (env.getMaxParallelism() > 0) { + parallelism = Math.min(parallelism, env.getMaxParallelism()); + } + + return parallelism; + } + /** * Create a source builder. * @@ -571,6 +615,41 @@ public IcebergSource build() { emitter); } + /** + * Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark + * strategy is set to {@link WatermarkStrategy#noWatermarks()}. + * + * @return data stream from the Iceberg source + */ + public DataStream buildStream(StreamExecutionEnvironment env) { + // buildStream should only be called with RowData or Converter paths. + Preconditions.checkState( + readerFunction == null, + "Cannot set reader function when building a data stream from the source"); + IcebergSource source = build(); + TypeInformation outputTypeInfo = + outputTypeInfo(converter, table.schema(), source.scanContext.project()); + DataStreamSource stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo); + if (source.shouldInferParallelism()) { + stream = stream.setParallelism(source.inferParallelism(flinkConfig, env)); + } + + return stream; + } + + private static TypeInformation outputTypeInfo( + RowDataConverter converter, Schema tableSchema, Schema projected) { + if (converter != null) { + return converter.getProducedType(); + } else { + // output type is RowData + Schema readSchema = projected != null ? projected : tableSchema; + return (TypeInformation) + FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema)); + } + } + private ReaderFunction readerFunction(ScanContext context) { if (table instanceof BaseMetadataTable) { MetaDataReaderFunction rowDataReaderFunction = diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 610657e8d47b..65adce77d9f9 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -23,11 +23,8 @@ import java.util.Map; import java.util.Optional; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; @@ -128,26 +125,18 @@ private DataStream createDataStream(StreamExecutionEnvironment execEnv) .build(); } - private DataStreamSource createFLIP27Stream(StreamExecutionEnvironment env) { + private DataStream createFLIP27Stream(StreamExecutionEnvironment env) { SplitAssignerType assignerType = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE); - IcebergSource source = - IcebergSource.forRowData() - .tableLoader(loader) - .assignerFactory(assignerType.factory()) - .properties(properties) - .project(getProjectedSchema()) - .limit(limit) - .filters(filters) - .flinkConfig(readableConfig) - .build(); - DataStreamSource stream = - env.fromSource( - source, - WatermarkStrategy.noWatermarks(), - source.name(), - TypeInformation.of(RowData.class)); - return stream; + return IcebergSource.forRowData() + .tableLoader(loader) + .assignerFactory(assignerType.factory()) + .properties(properties) + .project(getProjectedSchema()) + .limit(limit) + .filters(filters) + .flinkConfig(readableConfig) + .buildStream(env); } private TableSchema getProjectedSchema() { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java index b7447d15c05a..db8647f054ae 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java @@ -24,8 +24,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -130,11 +128,8 @@ protected List run( sourceBuilder.properties(options); DataStream stream = - env.fromSource( - sourceBuilder.build(), - WatermarkStrategy.noWatermarks(), - "testBasicRead", - TypeInformation.of(RowData.class)) + sourceBuilder + .buildStream(env) .map( new RowDataToRowMapper( FlinkSchemaUtil.convert( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java index 0f41c5af4c95..d3713e296014 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java @@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded { @BeforeEach public void before() throws IOException { Configuration tableConf = getTableEnv().getConfig().getConfiguration(); - tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); SqlHelpers.sql( getTableEnv(), "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java new file mode 100644 index 000000000000..2908cb927269 --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceInferParallelism.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.file.Path; +import java.util.List; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.data.RowDataToRowMapper; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +public class TestIcebergSourceInferParallelism { + private static final int NUM_TMS = 2; + private static final int SLOTS_PER_TM = 2; + private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM; + private static final int MAX_INFERRED_PARALLELISM = 3; + + @RegisterExtension + private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(SLOTS_PER_TM) + .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) + .build()); + + @RegisterExtension + protected static final HadoopCatalogExtension CATALOG_EXTENSION = + new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE); + + @TempDir private Path tmpDir; + + private Table table; + private GenericAppenderHelper dataAppender; + + @BeforeEach + public void before() throws IOException { + this.table = + CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA); + this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, tmpDir); + } + + @AfterEach + public void after() { + CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + + @Test + public void testEmptyTable() throws Exception { + // Inferred parallelism should be at least 1 even if table is empty + test(1, 0); + } + + @Test + public void testTableWithFilesLessThanMaxInferredParallelism() throws Exception { + // Append files to the table + for (int i = 0; i < 2; ++i) { + List batch = RandomGenericData.generate(table.schema(), 1, 0); + dataAppender.appendToTable(batch); + } + + // Inferred parallelism should equal to 2 splits + test(2, 2); + } + + @Test + public void testTableWithFilesMoreThanMaxInferredParallelism() throws Exception { + // Append files to the table + for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) { + List batch = RandomGenericData.generate(table.schema(), 1, 0); + dataAppender.appendToTable(batch); + } + + // Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM + test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1); + } + + private void test(int expectedParallelism, int expectedRecords) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + Configuration config = new Configuration(); + config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, true); + config.set( + FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX, + MAX_INFERRED_PARALLELISM); + + DataStream dataStream = + IcebergSource.forRowData() + .tableLoader(CATALOG_EXTENSION.tableLoader()) + .table(table) + .flinkConfig(config) + // force one file per split + .splitSize(1L) + .buildStream(env) + .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema()))); + + DataStream.Collector collector = new DataStream.Collector<>(); + dataStream.collectAsync(collector); + JobClient jobClient = env.executeAsync(); + try (CloseableIterator iterator = collector.getOutput()) { + List result = Lists.newArrayList(); + while (iterator.hasNext()) { + result.add(iterator.next()); + } + + assertThat(result).hasSize(expectedRecords); + verifySourceParallelism( + expectedParallelism, miniCluster().getExecutionGraph(jobClient.getJobID()).get()); + } + } + + /** + * Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to get source parallelism + * from execution graph. + */ + private static void verifySourceParallelism( + int expectedParallelism, AccessExecutionGraph executionGraph) { + AccessExecutionJobVertex sourceVertex = + executionGraph.getVerticesTopologically().iterator().next(); + assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism); + } + + /** + * Use reflection to get {@code InternalMiniClusterExtension} and {@code MiniCluster} to get + * execution graph and source parallelism. Haven't find other way via public APIS. + */ + private static MiniCluster miniCluster() throws Exception { + Field privateField = + MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension"); + privateField.setAccessible(true); + InternalMiniClusterExtension internalExtension = + (InternalMiniClusterExtension) privateField.get(MINI_CLUSTER_EXTENSION); + return internalExtension.getMiniCluster(); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index 75f0a785a8c5..548940a842ce 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -53,7 +53,12 @@ public class TestIcebergSourceSql extends TestSqlBase { public void before() throws IOException { TableEnvironment tableEnvironment = getTableEnv(); Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); - tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); + // Disable inferring parallelism to avoid interfering watermark tests + // that check split assignment is ordered by the watermark column. + // The tests assumes default parallelism of 1 with single reader task + // in order to check the order of read records. + tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1"); SqlHelpers.sql( diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index 41b023b93617..05a08c24d8d0 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.BatchExecutionOptions; @@ -48,8 +49,14 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +/** + * There is a infinite sleep in the test. Add a timeout to the test to avoid stuck situation in case + * anything goes wrong unexpectedly. + */ +@Timeout(value = 60) public class TestIcebergSpeculativeExecutionSupport extends TestBase { private static final int NUM_TASK_MANAGERS = 1; private static final int NUM_TASK_SLOTS = 3; @@ -144,9 +151,9 @@ public void testSpeculativeExecution() throws Exception { private static class TestingMap extends RichMapFunction { @Override public Row map(Row row) throws Exception { - // Put the subtasks with the first attempt to sleep to trigger speculative - // execution - if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) { + // Simulate slow subtask 0 with attempt 0 + TaskInfo taskInfo = getRuntimeContext().getTaskInfo(); + if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() <= 0) { Thread.sleep(Integer.MAX_VALUE); } @@ -169,6 +176,7 @@ private static Configuration configure() { // Use FLIP-27 source configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); + configuration.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); // for speculative execution configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);