Skip to content

Commit

Permalink
[Improve][Connector-V2][Sentry] Unified exception for sentry sink con…
Browse files Browse the repository at this point in the history
…nector (#3513)
  • Loading branch information
wuchunfu authored Nov 23, 2022
1 parent 545595c commit 94b472b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.sentry.sink;
package org.apache.seatunnel.connectors.seatunnel.sentry.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.sentry.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

public class SentryConnectorException extends SeaTunnelRuntimeException {

public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}

public SentryConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package org.apache.seatunnel.connectors.seatunnel.sentry.sink;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter.Context;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;
import org.apache.seatunnel.connectors.seatunnel.sentry.exception.SentryConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -37,10 +40,11 @@
* @description: SentrySink class
*/
@AutoService(SeaTunnelSink.class)
public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, SentrySinkState> {
public class SentrySink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private SeaTunnelRowType seaTunnelRowType;
private Config pluginConfig;

@Override
public String getPluginName() {
return SentryConfig.SENTRY;
Expand All @@ -49,8 +53,13 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
if (!pluginConfig.hasPath(SentryConfig.DSN.key())) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
String.format("Config must include column : %s", SentryConfig.DSN));
throw new SentryConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK,
String.format("Config must include column : %s",
SentryConfig.DSN)
)
);
}

this.pluginConfig = pluginConfig;
Expand All @@ -67,7 +76,7 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
}

@Override
public AbstractSinkWriter<SeaTunnelRow, SentrySinkState> createWriter(Context context) throws IOException {
return new SentrySinkWriter(seaTunnelRowType, context, pluginConfig);
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
return new SentrySinkWriter(seaTunnelRowType, pluginConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;

import com.google.auto.service.AutoService;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.sentry.sink;

import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.sentry.config.SentryConfig;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -33,32 +33,32 @@
* @description: SentrySinkWriter class
*/

public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, SentrySinkState> {
public class SentrySinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private SeaTunnelRowType seaTunnelRowType;

public SentrySinkWriter(SeaTunnelRowType seaTunnelRowType,
SinkWriter.Context context,
Config pluginConfig) {
SentryOptions options = new SentryOptions();
options.setDsn(pluginConfig.getString(SentryConfig.DSN.key()));
if (pluginConfig.hasPath(SentryConfig.ENV.key())){
if (pluginConfig.hasPath(SentryConfig.ENV.key())) {
options.setEnvironment(pluginConfig.getString(SentryConfig.ENV.key()));
}
if (pluginConfig.hasPath(SentryConfig.RELEASE.key())){
if (pluginConfig.hasPath(SentryConfig.RELEASE.key())) {
options.setRelease(pluginConfig.getString(SentryConfig.RELEASE.key()));
}
if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())){
if (pluginConfig.hasPath(SentryConfig.CACHE_DIRPATH.key())) {
options.setCacheDirPath(pluginConfig.getString(SentryConfig.CACHE_DIRPATH.key()));
}
if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())){
if (pluginConfig.hasPath(SentryConfig.MAX_CACHEITEMS.key())) {
options.setMaxCacheItems(pluginConfig.getInt(SentryConfig.MAX_CACHEITEMS.key()));
}
if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())){
if (pluginConfig.hasPath(SentryConfig.MAX_QUEUESIZE.key())) {
options.setMaxQueueSize(pluginConfig.getInt(SentryConfig.MAX_QUEUESIZE.key()));
}
if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())){
if (pluginConfig.hasPath(SentryConfig.FLUSH_TIMEOUTMILLIS.key())) {
options.setFlushTimeoutMillis(pluginConfig.getLong(SentryConfig.FLUSH_TIMEOUTMILLIS.key()));
}
if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())){
if (pluginConfig.hasPath(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key())) {
options.setEnableExternalConfiguration(pluginConfig.getBoolean(SentryConfig.ENABLE_EXTERNAL_CONFIGURATION.key()));
}
Sentry.init(options);
Expand Down

0 comments on commit 94b472b

Please sign in to comment.