diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index f36f405f344e..90e26a86d005 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -103,6 +103,8 @@ public class IcebergSource implements Source emitter; private final String tableName; + private volatile List batchSplits; + IcebergSource( TableLoader tableLoader, ScanContext scanContext, @@ -138,16 +140,26 @@ private String planningThreadName() { return tableName + "-" + UUID.randomUUID(); } + /** + * Cache the enumerated splits for batch execution to avoid double planning as there are two code + * paths obtaining splits: (1) infer parallelism (2) enumerator creation. + */ private List planSplitsForBatch(String threadName) { + if (batchSplits != null) { + return batchSplits; + } + ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); try (TableLoader loader = tableLoader.clone()) { loader.open(); - List splits = + this.batchSplits = FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", splits.size(), tableName); - return splits; + "Discovered {} splits from table {} during job initialization", + batchSplits.size(), + tableName); + return batchSplits; } catch (IOException e) { throw new UncheckedIOException("Failed to close table loader", e); } finally { @@ -213,6 +225,8 @@ private SplitEnumerator createEnumer // Only do scan planning if nothing is restored from checkpoint state List splits = planSplitsForBatch(planningThreadName()); assigner.onDiscoveredSplits(splits); + // clear the cached splits after enumerator creation as they won't be needed anymore + this.batchSplits = null; } return new StaticIcebergEnumerator(enumContext, assigner); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index acd5d1e957e4..548940a842ce 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -54,7 +54,10 @@ public void before() throws IOException { TableEnvironment tableEnvironment = getTableEnv(); Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true); - // disable inferring parallelism to avoid interfering watermark tests + // Disable inferring parallelism to avoid interfering watermark tests + // that check split assignment is ordered by the watermark column. + // The tests assumes default parallelism of 1 with single reader task + // in order to check the order of read records. tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false); tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1");