Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: backport PR #10832 of inferring parallelism in FLIP-27 source #11009

Merged
merged 1 commit into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -37,6 +39,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -74,6 +79,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
Expand All @@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;

// cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
// from two different threads: (1) source/stream construction by main thread (2) enumerator
// creation. Hence need volatile here.
private volatile List<IcebergSourceSplit> batchSplits;

IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
Expand Down Expand Up @@ -132,16 +143,26 @@ private String planningThreadName() {
return tableName + "-" + UUID.randomUUID();
}

/**
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
*/
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
if (batchSplits != null) {
return batchSplits;
}

ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
"Discovered {} splits from table {} during job initialization",
batchSplits.size(),
tableName);
return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
Expand Down Expand Up @@ -207,12 +228,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
// clear the cached splits after enumerator creation as they won't be needed anymore
this.batchSplits = null;
}

return new StaticIcebergEnumerator(enumContext, assigner);
}
}

private boolean shouldInferParallelism() {
return !scanContext.isStreaming();
}

private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
int parallelism =
SourceUtil.inferParallelism(
flinkConf,
scanContext.limit(),
() -> {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
return splits.size();
});

if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}

return parallelism;
}

/**
* Create a source builder.
*
Expand Down Expand Up @@ -571,6 +615,41 @@ public IcebergSource<T> build() {
emitter);
}

/**
* Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
* strategy is set to {@link WatermarkStrategy#noWatermarks()}.
*
* @return data stream from the Iceberg source
*/
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
// buildStream should only be called with RowData or Converter paths.
Preconditions.checkState(
readerFunction == null,
"Cannot set reader function when building a data stream from the source");
IcebergSource<T> source = build();
TypeInformation<T> outputTypeInfo =
outputTypeInfo(converter, table.schema(), source.scanContext.project());
DataStreamSource<T> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
if (source.shouldInferParallelism()) {
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
}

return stream;
}

private static <T> TypeInformation<T> outputTypeInfo(
RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
if (converter != null) {
return converter.getProducedType();
} else {
// output type is RowData
Schema readSchema = projected != null ? projected : tableSchema;
return (TypeInformation<T>)
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
}
}

private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand Down Expand Up @@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
.build();
}

private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
DataStreamSource stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
source.name(),
TypeInformation.of(RowData.class));
return stream;
return IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.buildStream(env);
}

private TableSchema getProjectedSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -130,11 +128,8 @@ protected List<Row> run(
sourceBuilder.properties(options);

DataStream<Row> stream =
env.fromSource(
sourceBuilder.build(),
WatermarkStrategy.noWatermarks(),
"testBasicRead",
TypeInformation.of(RowData.class))
sourceBuilder
.buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
Expand Down
Loading