Skip to content

Commit

Permalink
cache splits for batch mode to avoid double planning
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu committed Aug 6, 2024
1 parent 057fcf2 commit aaf3765
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;

private volatile List<IcebergSourceSplit> batchSplits;

IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
Expand Down Expand Up @@ -138,16 +140,26 @@ private String planningThreadName() {
return tableName + "-" + UUID.randomUUID();
}

/**
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
*/
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
if (batchSplits != null) {
return batchSplits;
}

ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
"Discovered {} splits from table {} during job initialization",
batchSplits.size(),
tableName);
return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
Expand Down Expand Up @@ -213,6 +225,8 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
// clear the cached splits after enumerator creation as they won't be needed anymore
this.batchSplits = null;
}

return new StaticIcebergEnumerator(enumContext, assigner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public void before() throws IOException {
TableEnvironment tableEnvironment = getTableEnv();
Configuration tableConf = tableEnvironment.getConfig().getConfiguration();
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
// disable inferring parallelism to avoid interfering watermark tests
// Disable inferring parallelism to avoid interfering watermark tests
// that check split assignment is ordered by the watermark column.
// The tests assumes default parallelism of 1 with single reader task
// in order to check the order of read records.
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);

tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1");
Expand Down

0 comments on commit aaf3765

Please sign in to comment.