Skip to content

Commit

Permalink
[Feature][Flink] Support multiple tables read and write
Browse files Browse the repository at this point in the history
  • Loading branch information
PeppaPage committed Sep 25, 2024
1 parent 61d1964 commit afbc563
Show file tree
Hide file tree
Showing 26 changed files with 329 additions and 180 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 45
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -654,7 +654,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -683,7 +683,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -828,7 +828,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
2 changes: 1 addition & 1 deletion docs/en/start-v2/locally/quick-start-flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
```

Flink version between `1.15.x` and `1.16.x`
Flink version between `1.15.x` and `1.18.x`

```shell
cd "apache-seatunnel-${version}"
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/start-v2/locally/quick-start-flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
```

Flink版本`1.15.x``1.16.x`
Flink版本`1.15.x``1.18.x`

```shell
cd "apache-seatunnel-${version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ public static SourceTableInfo createSource(
catalogTables =
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
}

// if (catalogTables.size() != 1) {
// throw new SeaTunnelException(
// String.format("Unsupported table number: %d on flink",
// catalogTables.size()));
// }
return new SourceTableInfo(source, catalogTables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
Expand All @@ -42,9 +46,11 @@
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.types.Row;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,6 +60,7 @@
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

Expand Down Expand Up @@ -99,35 +106,48 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
SeaTunnelSink sink;
Map<String, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
sink =
fallbackCreateSink(
sinkPluginDiscovery,
PluginIdentifier.of(
ENGINE_TYPE,
PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
// TODO support sink multi sink
SeaTunnelRowType sourceType =
stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
fallbackCreateSink(
sinkPluginDiscovery,
PluginIdentifier.of(
ENGINE_TYPE,
PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
fallBackSink.setJobContext(jobContext);
SeaTunnelRowType sourceType = catalogTable.getSeaTunnelRowType();
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, fallBackSink);
}
} else {
// TODO support sink multi sink
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink seaTunnelSink;
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
seaTunnelSink =
((TableSinkFactory) factory.get()).createSink(context).createSink();
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, seaTunnelSink);
}
}
handleSaveMode(sink);
SeaTunnelSink sink =
tryGenerateMultiTableSink(
sinks, ReadonlyConfig.fromConfig(sinkConfig), classLoader);
boolean sinkParallelism = sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
boolean envParallelism = envConfig.hasPath(CommonOptions.PARALLELISM.key());
int parallelism =
Expand All @@ -136,18 +156,27 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
: envParallelism
? envConfig.getInt(CommonOptions.PARALLELISM.key())
: 1;
DataStreamSink<Row> dataStreamSink =
DataStreamSink<SeaTunnelRow> dataStreamSink =
stream.getDataStream()
.sinkTo(
new FlinkSink<>(
sink, stream.getCatalogTables().get(0), parallelism))
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTables(), parallelism))
.name(String.format("%s-Sink", sink.getPluginName()));
dataStreamSink.setParallelism(parallelism);
}
// the sink is the last stream
return null;
}

// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink template");
// choose the first sink
return sinks.values().iterator().next();
}
return FactoryUtil.createMultiTableSink(sinks, sinkConfig, classLoader);
}

public boolean isFallback(Factory factory) {
try {
((TableSinkFactory) factory).createSink(null);
Expand All @@ -170,10 +199,10 @@ public SeaTunnelSink fallbackCreateSink(
return source;
}

public void handleSaveMode(SeaTunnelSink sink) {
if (sink instanceof SupportSaveMode) {
Optional<SaveModeHandler> saveModeHandler =
((SupportSaveMode) sink).getSaveModeHandler();
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (seaTunnelSink instanceof SupportSaveMode) {
SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
handler.open();
Expand All @@ -182,11 +211,6 @@ public void handleSaveMode(SeaTunnelSink sink) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} else if (sink instanceof MultiTableSink) {
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
}
}
}
Loading

0 comments on commit afbc563

Please sign in to comment.