diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java similarity index 97% rename from seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java rename to seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java index ab79b8d9436..2297342b562 100644 --- a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentryConfig.java +++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/config/SentryConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.sentry.sink; +package org.apache.seatunnel.connectors.seatunnel.sentry.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/exception/SentryConnectorException.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/exception/SentryConnectorException.java new file mode 100644 index 00000000000..27078c2408d --- /dev/null +++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/exception/SentryConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.sentry.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class SentryConnectorException extends SeaTunnelRuntimeException { + + public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java index 8c8eb542237..230aae43acd 100644 --- a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java +++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySink.java @@ -18,14 +18,17 @@ package org.apache.seatunnel.connectors.seatunnel.sentry.sink; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.api.sink.SinkWriter.Context; +import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig; +import org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -37,10 +40,11 @@ * @description: SentrySink class */ @AutoService(SeaTunnelSink.class) -public class SentrySink extends AbstractSimpleSink { +public class SentrySink extends AbstractSimpleSink { private SeaTunnelRowType seaTunnelRowType; private Config pluginConfig; + @Override public String getPluginName() { return SentryConfig.SENTRY; @@ -49,8 +53,13 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { if (!pluginConfig.hasPath(SentryConfig.DSN.key())) { - throw new PrepareFailException(getPluginName(), PluginType.SINK, - String.format("Config must include column : %s", SentryConfig.DSN)); + throw new SentryConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, + String.format("Config must include column : %s", + SentryConfig.DSN) + ) + ); } this.pluginConfig = pluginConfig; @@ -67,7 +76,7 @@ public SeaTunnelDataType getConsumedType() { } @Override - public AbstractSinkWriter createWriter(Context context) throws IOException { - return new SentrySinkWriter(seaTunnelRowType, context, pluginConfig); + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new SentrySinkWriter(seaTunnelRowType, pluginConfig); } } diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java index 7fe6275f9f1..a3d6a9772f5 100644 --- a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java +++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkFactory.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig; import com.google.auto.service.AutoService; diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java deleted file mode 100644 index b5ad101c326..00000000000 --- a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkState.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.sentry.sink; - -import java.io.Serializable; - -public class SentrySinkState implements Serializable { -} diff --git a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java index 9580dee2b83..d29c05c7bc6 100644 --- a/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java +++ b/seatunnel-connectors-v2/connector-sentry/src/main/java/org/apache/seatunnel/connectors/seatunnel/sentry/sink/SentrySinkWriter.java @@ -17,10 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.sentry.sink; -import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -33,32 +33,32 @@ * @description: SentrySinkWriter class */ -public class SentrySinkWriter extends AbstractSinkWriter { +public class SentrySinkWriter extends AbstractSinkWriter { private SeaTunnelRowType seaTunnelRowType; + public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType, - SinkWriter.Context context, Config pluginConfig) { SentryOptions options = new SentryOptions(); options.setDsn(pluginConfig.getString(SentryConfig.DSN.key())); - if (pluginConfig.hasPath(SentryConfig.ENV.key())){ + if (pluginConfig.hasPath(SentryConfig.ENV.key())) { options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key())); } - if (pluginConfig.hasPath(SentryConfig.RELEASE.key())){ + if (pluginConfig.hasPath(SentryConfig.RELEASE.key())) { options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key())); } - if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())){ + if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())) { options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key())); } - if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())){ + if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())) { options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key())); } - if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())){ + if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())) { options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key())); } - if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())){ + if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())) { options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key())); } - if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())){ + if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())) { options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())); } Sentry.init(options);