Skip to content

Commit

Permalink
Flink: backport PR apache#10832 of inferring parallelism in FLIP-27 s…
Browse files Browse the repository at this point in the history
…ource
  • Loading branch information
stevenzwu committed Aug 26, 2024
1 parent 2ed61a1 commit d66a3df
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -37,6 +39,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -74,6 +79,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
Expand All @@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;

// cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
// from two different threads: (1) source/stream construction by main thread (2) enumerator
// creation. Hence need volatile here.
private volatile List<IcebergSourceSplit> batchSplits;

IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
Expand Down Expand Up @@ -132,16 +143,26 @@ private String planningThreadName() {
return tableName + "-" + UUID.randomUUID();
}

/**
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
*/
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
if (batchSplits != null) {
return batchSplits;
}

ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
"Discovered {} splits from table {} during job initialization",
batchSplits.size(),
tableName);
return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
Expand Down Expand Up @@ -207,12 +228,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
// clear the cached splits after enumerator creation as they won't be needed anymore
this.batchSplits = null;
}

return new StaticIcebergEnumerator(enumContext, assigner);
}
}

private boolean shouldInferParallelism() {
return !scanContext.isStreaming();
}

private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
int parallelism =
SourceUtil.inferParallelism(
flinkConf,
scanContext.limit(),
() -> {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
return splits.size();
});

if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}

return parallelism;
}

/**
* Create a source builder.
*
Expand Down Expand Up @@ -571,6 +615,41 @@ public IcebergSource<T> build() {
emitter);
}

/**
* Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
* strategy is set to {@link WatermarkStrategy#noWatermarks()}.
*
* @return data stream from the Iceberg source
*/
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
// buildStream should only be called with RowData or Converter paths.
Preconditions.checkState(
readerFunction == null,
"Cannot set reader function when building a data stream from the source");
IcebergSource<T> source = build();
TypeInformation<T> outputTypeInfo =
outputTypeInfo(converter, table.schema(), source.scanContext.project());
DataStreamSource<T> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
if (source.shouldInferParallelism()) {
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
}

return stream;
}

private static <T> TypeInformation<T> outputTypeInfo(
RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
if (converter != null) {
return converter.getProducedType();
} else {
// output type is RowData
Schema readSchema = projected != null ? projected : tableSchema;
return (TypeInformation<T>)
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
}
}

private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand Down Expand Up @@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
.build();
}

private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
DataStreamSource stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
source.name(),
TypeInformation.of(RowData.class));
return stream;
return IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.buildStream(env);
}

private TableSchema getProjectedSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -130,11 +128,8 @@ protected List<Row> run(
sourceBuilder.properties(options);

DataStream<Row> stream =
env.fromSource(
sourceBuilder.build(),
WatermarkStrategy.noWatermarks(),
"testBasicRead",
TypeInformation.of(RowData.class))
sourceBuilder
.buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
Expand Down
Loading

0 comments on commit d66a3df

Please sign in to comment.