Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Add default implement for SeaTunnelSource::getProducedType #5670

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public interface SeaTunnelSource<T, SplitT extends SourceSplit, StateT extends S
*/
@Deprecated
default SeaTunnelDataType<T> getProducedType() {
throw new UnsupportedOperationException("getProducedType method has not been implemented.");
return (SeaTunnelDataType) getProducedCatalogTables().get(0).getSeaTunnelRowType();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.seatunnel.connectors.cdc.base.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
Expand Down Expand Up @@ -118,20 +115,6 @@ protected IncrementalSource(
this.offsetFactory = createOffsetFactory(readonlyConfig);
}

@Override
public final void prepare(Config pluginConfig) throws PrepareFailException {
this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);

this.startupConfig = getStartupConfig(readonlyConfig);
this.stopConfig = getStopConfig(readonlyConfig);
this.stopMode = stopConfig.getStopMode();
this.incrementalParallelism = readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
this.configFactory = createSourceConfigFactory(readonlyConfig);
this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
this.deserializationSchema = createDebeziumDeserializationSchema(readonlyConfig);
this.offsetFactory = createOffsetFactory(readonlyConfig);
}

protected StartupConfig getStartupConfig(ReadonlyConfig config) {
return new StartupConfig(
config.get(getStartupModeOption()),
Expand Down Expand Up @@ -178,11 +161,6 @@ public Boundedness getBoundedness() {
return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
}

@Override
public SeaTunnelDataType<T> getProducedType() {
return deserializationSchema.getProducedType();
}

@SuppressWarnings("MagicNumber")
@Override
public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context readerContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
Expand All @@ -31,27 +28,19 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;

import org.apache.commons.collections4.CollectionUtils;

import com.google.auto.service.AutoService;
import com.google.common.collect.Lists;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

@AutoService(SeaTunnelSource.class)
public class FakeSource
implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState>,
SupportParallelism,
Expand Down Expand Up @@ -96,11 +85,6 @@ public List<CatalogTable> getProducedCatalogTables() {
.collect(Collectors.toList());
}

@Override
public SeaTunnelRowType getProducedType() {
return catalogTable.getSeaTunnelRowType();
}

@Override
public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(
SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
Expand All @@ -126,21 +110,6 @@ public String getPluginName() {
return "FakeSource";
}

@Override
public void prepare(Config pluginConfig) {
CheckResult result =
CheckConfigUtil.checkAllExists(pluginConfig, TableSchemaOptions.SCHEMA.key());
if (!result.isSuccess()) {
throw new FakeConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ private static boolean isFallback(Optional<Factory> factory) {
.equals(e.getMessage())) {
return true;
}
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public boolean isFallback(Factory factory) {
.equals(e.getMessage())) {
return true;
}
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ public boolean isFallback(Factory factory) {
.equals(e.getMessage())) {
return true;
}
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public boolean isFallback(Factory factory) {
.equals(e.getMessage())) {
return true;
}
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public boolean isFallback(Factory factory) {
.equals(e.getMessage())) {
return true;
}
return true;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public static LogicalDag getTestLogicalDag(JobContext jobContext) throws Malform
"fields", ImmutableMap.of("id", "int", "name", "string"))));
FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
fakeSource.setJobContext(jobContext);
fakeSource.prepare(fakeSourceConfig);

Action fake =
new SourceAction<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ private static void fillVirtualVertex(
Collections.singletonMap(
"fields", ImmutableMap.of("id", "int", "name", "string"))));
FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
fakeSource.prepare(fakeSourceConfig);
fakeSource.setJobContext(jobContext);

Action fake =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ private static FakeSource createFakeSource() {
"schema",
Collections.singletonMap(
"fields", ImmutableMap.of("id", "int", "name", "string"))));
FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
fakeSource.prepare(fakeSourceConfig);
return fakeSource;
return new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig));
}
}