diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java similarity index 93% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java index 5d378140e94..585a8f4e068 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableAggregatedCommitInfo.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableAggregatedCommitInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java similarity index 93% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java index 21faf0c7edc..8b12fa07c5b 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableCommitInfo.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableCommitInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java similarity index 97% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index 7abb176117d..bb04283ca68 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.serialization.DefaultSerializer; @@ -28,6 +28,8 @@ import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import lombok.Getter; + import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -44,7 +46,7 @@ public class MultiTableSink MultiTableCommitInfo, MultiTableAggregatedCommitInfo> { - private final Map sinks; + @Getter private final Map sinks; private final int replicaNum; public MultiTableSink(MultiTableFactoryContext context) { diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java similarity index 99% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java index 31dd91f1eec..6ed04d871bf 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java similarity index 98% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java index ed52fafb002..113e269fd07 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkCommitter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkCommitter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.sink.SinkCommitter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java similarity index 96% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java index 00e1e1ab133..08db91b7c8e 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkFactory.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkFactory.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.connector.TableSink; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java similarity index 99% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index 12163676d7d..3c73435fafb 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkWriter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java similarity index 93% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java index 43f5d8bd996..ac7db893ba0 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableState.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableState.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import lombok.AllArgsConstructor; import lombok.Getter; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java similarity index 97% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java index ce22e0e2e20..3026dc778b8 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableWriterRunnable.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java similarity index 95% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java index f7691ddedff..3a97bb27bc9 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkContextProxy.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkContextProxy.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.event.EventListener; diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java similarity index 94% rename from seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java rename to seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java index 18f7484853d..50eac7c0d9d 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/SinkIdentifier.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/SinkIdentifier.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.common.multitablesink; +package org.apache.seatunnel.api.sink.multitablesink; import lombok.EqualsAndHashCode; import lombok.Getter; diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 668ff2a43c8..79c0c18706f 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.env.ParsingMode; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSinkFactory; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceOptions; import org.apache.seatunnel.api.source.SourceSplit; @@ -151,7 +152,7 @@ SeaTunnelSink createMultiTableSi ClassLoader classLoader) { try { TableSinkFactory factory = - discoverFactory(classLoader, TableSinkFactory.class, "MultiTableSink"); + new MultiTableSinkFactory(); MultiTableFactoryContext context = new MultiTableFactoryContext(options, classLoader, sinks); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index e9928a018a1..aa74460b056 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -26,9 +26,9 @@ import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SupportSaveMode; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; @@ -375,13 +375,8 @@ public static void handleSaveMode(SeaTunnelSink sink) { throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e); } } - } else if (sink.getClass() - .getName() - .equals( - "org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink")) { - // TODO we should not use class name to judge the sink type - Map sinks = - (Map) ReflectionUtils.getField(sink, "sinks").get(); + } else if (sink instanceof MultiTableSink) { + Map sinks = ((MultiTableSink) sink).getSinks(); for (SeaTunnelSink seaTunnelSink : sinks.values()) { handleSaveMode(seaTunnelSink); }