Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Flink] Support multiple tables read and write #7713

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 45
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -654,7 +654,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -683,7 +683,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down Expand Up @@ -828,7 +828,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
timeout-minutes: 150
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
Expand Down
2 changes: 1 addition & 1 deletion docs/en/start-v2/locally/quick-start-flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
```

Flink version between `1.15.x` and `1.16.x`
Flink version between `1.15.x` and `1.18.x`

```shell
cd "apache-seatunnel-${version}"
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/start-v2/locally/quick-start-flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
```

Flink版本`1.15.x`到`1.16.x`
Flink版本`1.15.x`到`1.18.x`

```shell
cd "apache-seatunnel-${version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ public static SourceTableInfo createSource(
catalogTables =
CatalogTableUtil.convertDataTypeToCatalogTables(seaTunnelDataType, tableId);
}

// if (catalogTables.size() != 1) {
// throw new SeaTunnelException(
// String.format("Unsupported table number: %d on flink",
// catalogTables.size()));
// }
return new SourceTableInfo(source, catalogTables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import org.apache.seatunnel.api.sink.SaveModeExecuteWrapper;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
Expand All @@ -42,9 +46,11 @@
import org.apache.seatunnel.translation.flink.sink.FlinkSink;

import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.types.Row;

import lombok.extern.slf4j.Slf4j;

import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,6 +60,7 @@
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

Expand Down Expand Up @@ -99,35 +106,48 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
fromSourceTable(sinkConfig, upstreamDataStreams).orElse(input);
Optional<? extends Factory> factory = plugins.get(i);
boolean fallBack = !factory.isPresent() || isFallback(factory.get());
SeaTunnelSink sink;
Map<String, SeaTunnelSink> sinks = new HashMap<>();
if (fallBack) {
sink =
fallbackCreateSink(
sinkPluginDiscovery,
PluginIdentifier.of(
ENGINE_TYPE,
PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
// TODO support sink multi sink
SeaTunnelRowType sourceType =
stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink fallBackSink =
fallbackCreateSink(
sinkPluginDiscovery,
PluginIdentifier.of(
ENGINE_TYPE,
PLUGIN_TYPE,
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
fallBackSink.setJobContext(jobContext);
SeaTunnelRowType sourceType = catalogTable.getSeaTunnelRowType();
fallBackSink.setTypeInfo(sourceType);
handleSaveMode(fallBackSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, fallBackSink);
}
} else {
// TODO support sink multi sink
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
sink = ((TableSinkFactory) factory.get()).createSink(context).createSink();
sink.setJobContext(jobContext);
for (CatalogTable catalogTable : stream.getCatalogTables()) {
SeaTunnelSink seaTunnelSink;
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
catalogTable,
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
.excludeTablePlaceholderReplaceKeys());
ConfigValidator.of(context.getOptions()).validate(factory.get().optionRule());
seaTunnelSink =
((TableSinkFactory) factory.get()).createSink(context).createSink();
seaTunnelSink.setJobContext(jobContext);
handleSaveMode(seaTunnelSink);
TableIdentifier tableId = catalogTable.getTableId();
String tableIdName = tableId.toTablePath().toString();
sinks.put(tableIdName, seaTunnelSink);
}
}
handleSaveMode(sink);
SeaTunnelSink sink =
tryGenerateMultiTableSink(
sinks, ReadonlyConfig.fromConfig(sinkConfig), classLoader);
boolean sinkParallelism = sinkConfig.hasPath(CommonOptions.PARALLELISM.key());
boolean envParallelism = envConfig.hasPath(CommonOptions.PARALLELISM.key());
int parallelism =
Expand All @@ -136,18 +156,27 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
: envParallelism
? envConfig.getInt(CommonOptions.PARALLELISM.key())
: 1;
DataStreamSink<Row> dataStreamSink =
DataStreamSink<SeaTunnelRow> dataStreamSink =
stream.getDataStream()
.sinkTo(
new FlinkSink<>(
sink, stream.getCatalogTables().get(0), parallelism))
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTables(), parallelism))
.name(String.format("%s-Sink", sink.getPluginName()));
dataStreamSink.setParallelism(parallelism);
}
// the sink is the last stream
return null;
}

// if not support multi table, rollback
public SeaTunnelSink tryGenerateMultiTableSink(
Map<String, SeaTunnelSink> sinks, ReadonlyConfig sinkConfig, ClassLoader classLoader) {
if (sinks.values().stream().anyMatch(sink -> !(sink instanceof SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink template");
// choose the first sink
return sinks.values().iterator().next();
}
return FactoryUtil.createMultiTableSink(sinks, sinkConfig, classLoader);
}

public boolean isFallback(Factory factory) {
try {
((TableSinkFactory) factory).createSink(null);
Expand All @@ -170,10 +199,10 @@ public SeaTunnelSink fallbackCreateSink(
return source;
}

public void handleSaveMode(SeaTunnelSink sink) {
if (sink instanceof SupportSaveMode) {
Optional<SaveModeHandler> saveModeHandler =
((SupportSaveMode) sink).getSaveModeHandler();
public void handleSaveMode(SeaTunnelSink seaTunnelSink) {
if (seaTunnelSink instanceof SupportSaveMode) {
SupportSaveMode saveModeSink = (SupportSaveMode) seaTunnelSink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
if (saveModeHandler.isPresent()) {
try (SaveModeHandler handler = saveModeHandler.get()) {
handler.open();
Expand All @@ -182,11 +211,6 @@ public void handleSaveMode(SeaTunnelSink sink) {
throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
}
}
} else if (sink instanceof MultiTableSink) {
Map<String, SeaTunnelSink> sinks = ((MultiTableSink) sink).getSinks();
for (SeaTunnelSink seaTunnelSink : sinks.values()) {
handleSaveMode(seaTunnelSink);
}
}
}
}
Loading
Loading