diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java index 924c2e52443..916f076f57d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java @@ -56,7 +56,7 @@ public interface SeaTunnelSource getProducedType() { - throw new UnsupportedOperationException("getProducedType method has not been implemented."); + return (SeaTunnelDataType) getProducedCatalogTables().get(0).getSeaTunnelRowType(); } /** diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java index 4c8ee235a97..11769ab87a2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java @@ -17,9 +17,6 @@ package org.apache.seatunnel.connectors.cdc.base.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.ReadonlyConfig; @@ -118,20 +115,6 @@ protected IncrementalSource( this.offsetFactory = createOffsetFactory(readonlyConfig); } - @Override - public final void prepare(Config pluginConfig) throws PrepareFailException { - this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); - - this.startupConfig = getStartupConfig(readonlyConfig); - this.stopConfig = getStopConfig(readonlyConfig); - this.stopMode = stopConfig.getStopMode(); - this.incrementalParallelism = readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM); - this.configFactory = createSourceConfigFactory(readonlyConfig); - this.dataSourceDialect = createDataSourceDialect(readonlyConfig); - this.deserializationSchema = createDebeziumDeserializationSchema(readonlyConfig); - this.offsetFactory = createOffsetFactory(readonlyConfig); - } - protected StartupConfig getStartupConfig(ReadonlyConfig config) { return new StartupConfig( config.get(getStartupModeOption()), @@ -178,11 +161,6 @@ public Boundedness getBoundedness() { return stopMode == StopMode.NEVER ? Boundedness.UNBOUNDED : Boundedness.BOUNDED; } - @Override - public SeaTunnelDataType getProducedType() { - return deserializationSchema.getProducedType(); - } - @SuppressWarnings("MagicNumber") @Override public SourceReader createReader(SourceReader.Context readerContext) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java index e634347fdec..d3bf9c6430a 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java @@ -17,10 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; @@ -31,27 +28,19 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.JobMode; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; -import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState; import org.apache.commons.collections4.CollectionUtils; -import com.google.auto.service.AutoService; import com.google.common.collect.Lists; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -@AutoService(SeaTunnelSource.class) public class FakeSource implements SeaTunnelSource, SupportParallelism, @@ -96,11 +85,6 @@ public List getProducedCatalogTables() { .collect(Collectors.toList()); } - @Override - public SeaTunnelRowType getProducedType() { - return catalogTable.getSeaTunnelRowType(); - } - @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) throws Exception { @@ -126,21 +110,6 @@ public String getPluginName() { return "FakeSource"; } - @Override - public void prepare(Config pluginConfig) { - CheckResult result = - CheckConfigUtil.checkAllExists(pluginConfig, TableSchemaOptions.SCHEMA.key()); - if (!result.isSuccess()) { - throw new FakeConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); - this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig); - } - @Override public void setJobContext(JobContext jobContext) { this.jobContext = jobContext; diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index 0b1f34d3c1a..0dc4209a8b6 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -117,7 +117,6 @@ private static boolean isFallback(Optional factory) { .equals(e.getMessage())) { return true; } - return true; } return false; } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 86caa6939a5..4f02b679b53 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -138,7 +138,6 @@ public boolean isFallback(Factory factory) { .equals(e.getMessage())) { return true; } - return true; } return false; } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 2109ffe88fc..c796a99400d 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -141,7 +141,6 @@ public boolean isFallback(Factory factory) { .equals(e.getMessage())) { return true; } - return true; } return false; } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index 5546890e832..156fd9e6995 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -157,7 +157,6 @@ public boolean isFallback(Factory factory) { .equals(e.getMessage())) { return true; } - return true; } return false; } diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java index b3d978b1cbc..64d9fcaf779 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java @@ -159,7 +159,6 @@ public boolean isFallback(Factory factory) { .equals(e.getMessage())) { return true; } - return true; } return false; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java index 0e47404b8e5..6de34c33667 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java @@ -63,7 +63,6 @@ public static LogicalDag getTestLogicalDag(JobContext jobContext) throws Malform "fields", ImmutableMap.of("id", "int", "name", "string")))); FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig)); fakeSource.setJobContext(jobContext); - fakeSource.prepare(fakeSourceConfig); Action fake = new SourceAction<>( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java index 069c6a86268..88fb8d88ccd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java @@ -115,7 +115,6 @@ private static void fillVirtualVertex( Collections.singletonMap( "fields", ImmutableMap.of("id", "int", "name", "string")))); FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig)); - fakeSource.prepare(fakeSourceConfig); fakeSource.setJobContext(jobContext); Action fake = diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java index 01e105d0870..495add1e90d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java @@ -165,8 +165,6 @@ private static FakeSource createFakeSource() { "schema", Collections.singletonMap( "fields", ImmutableMap.of("id", "int", "name", "string")))); - FakeSource fakeSource = new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig)); - fakeSource.prepare(fakeSourceConfig); - return fakeSource; + return new FakeSource(ReadonlyConfig.fromConfig(fakeSourceConfig)); } }