From 7353fed6d6a973d3029c3f5520fc4e3370e53bca Mon Sep 17 00:00:00 2001 From: ChunFu Wu <319355703@qq.com> Date: Sat, 26 Nov 2022 17:03:58 +0800 Subject: [PATCH] [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector (#3557) * [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector * [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector * [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector --- .../Error-Quick-Reference-Manual.md | 10 ++- .../exception/IotdbConnectorErrorCode.java | 50 +++++++++++++++ .../exception/IotdbConnectorException.java | 36 +++++++++++ .../DefaultSeaTunnelRowDeserializer.java | 16 +++-- .../DefaultSeaTunnelRowSerializer.java | 13 ++-- .../seatunnel/iotdb/sink/IoTDBSink.java | 8 ++- .../seatunnel/iotdb/sink/IoTDBSinkClient.java | 63 ++++++++++--------- .../seatunnel/iotdb/source/IoTDBSource.java | 10 ++- .../iotdb/source/IoTDBSourceReader.java | 11 ++-- .../iotdb/source/IoTDBSourceSplit.java | 4 +- .../source/IoTDBSourceSplitEnumerator.java | 8 ++- 11 files changed, 180 insertions(+), 49 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorException.java diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md index 22b22a32386..568215a3598 100644 --- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md +++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md @@ -107,4 +107,12 @@ This document records some common error codes and corresponding solutions of Sea | KUDU-04 | Upsert data to Kudu failed | When users encounter this error code, it means that Kudu has some problems, please check it whether is work | | KUDU-05 | Insert data to Kudu failed | When users encounter this error code, it means that Kudu has some problems, please check it whether is work | | KUDU-06 | Initialize the Kudu client failed | When users encounter this error code, it is usually there are some problems with initializing the Kudu client, please check your configuration whether correct and connector is work | -| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When users encounter this error code, it means that there are some problems on Kudu parameters generation, please check your configuration | \ No newline at end of file +| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When users encounter this error code, it means that there are some problems on Kudu parameters generation, please check your configuration | + +## IotDB Connector Error Codes + +| code | description | solution | +|----------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------| +| IOTDB-01 | Close IoTDB session failed | When the user encounters this error code, it indicates that closing the session failed. Please check | +| IOTDB-02 | Initialize IoTDB client failed | When the user encounters this error code, it indicates that the client initialization failed. Please check | +| IOTDB-03 | Close IoTDB client failed | When the user encounters this error code, it indicates that closing the client failed. Please check | diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorErrorCode.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorErrorCode.java new file mode 100644 index 00000000000..53333331d11 --- /dev/null +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorErrorCode.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iotdb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum IotdbConnectorErrorCode implements SeaTunnelErrorCode { + + CLOSE_SESSION_FAILED("IOTDB-01", "Close IoTDB session failed"), + INITIALIZE_CLIENT_FAILED("IOTDB-02", "Initialize IoTDB client failed"), + CLOSE_CLIENT_FAILED("IOTDB-03", "Close IoTDB client failed"); + + private final String code; + private final String description; + + IotdbConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String getErrorMessage() { + return SeaTunnelErrorCode.super.getErrorMessage(); + } +} diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorException.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorException.java new file mode 100644 index 00000000000..c1dc70d038a --- /dev/null +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/exception/IotdbConnectorException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iotdb.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class IotdbConnectorException extends SeaTunnelRuntimeException { + + public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java index 4ab773cbcd7..486e74f493a 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowDeserializer.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import lombok.AllArgsConstructor; import org.apache.iotdb.tsfile.read.common.Field; @@ -43,11 +45,12 @@ private SeaTunnelRow convert(RowRecord rowRecord) { long timestamp = rowRecord.getTimestamp(); List fields = rowRecord.getFields(); if (fields.size() != (rowType.getTotalFields() - 1)) { - throw new IllegalStateException("Illegal SeaTunnelRowType: " + rowRecord); + throw new IotdbConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + "Illegal SeaTunnelRowType: " + rowRecord); } Object[] seaTunnelFields = new Object[rowType.getTotalFields()]; - seaTunnelFields[0] = convertTimestamp(timestamp, rowType.getFieldType(0)); + seaTunnelFields[0] = convertTimestamp(timestamp, rowType.getFieldType(0)); for (int i = 1; i < rowType.getTotalFields(); i++) { Field field = fields.get(i - 1); if (field == null || field.getDataType() == null) { @@ -73,7 +76,8 @@ private Object convert(SeaTunnelDataType seaTunnelFieldType, case INT: return int32.intValue(); default: - throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + seaTunnelFieldType); } case INT64: return field.getLongV(); @@ -86,7 +90,8 @@ private Object convert(SeaTunnelDataType seaTunnelFieldType, case BOOLEAN: return field.getBoolV(); default: - throw new IllegalArgumentException("unknown TSData type: " + field.getDataType()); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + field.getDataType()); } } @@ -101,7 +106,8 @@ private Object convertTimestamp(long timestamp, case BIGINT: return timestamp; default: - throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + seaTunnelFieldType); } } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java index d7cf508f853..201f844c38e 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java @@ -20,6 +20,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import com.google.common.base.Strings; import lombok.NonNull; @@ -78,14 +80,15 @@ private Function createTimestampExtractor(SeaTunnelRowType s case STRING: return Long.parseLong((String) timestamp); case TIMESTAMP: - return LocalDateTime.class.cast(timestamp) + return ((LocalDateTime) timestamp) .atZone(ZoneOffset.UTC) .toInstant() .toEpochMilli(); case BIGINT: return (Long) timestamp; default: - throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + timestampFieldType); } }; } @@ -167,7 +170,8 @@ private static TSDataType convert(SeaTunnelDataType dataType) { case DOUBLE: return TSDataType.DOUBLE; default: - throw new UnsupportedOperationException("Unsupported dataType: " + dataType); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + dataType); } } @@ -191,7 +195,8 @@ private static Object convert(SeaTunnelDataType seaTunnelType, case TEXT: return value.toString(); default: - throw new UnsupportedOperationException("Unsupported dataType: " + tsDataType); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported data type: " + tsDataType); } } } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java index aa813b95e6c..ce060c3ebda 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java @@ -23,6 +23,7 @@ import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.USERNAME; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -33,6 +34,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -53,7 +55,11 @@ public String getPluginName() { public void prepare(Config pluginConfig) throws PrepareFailException { CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS.key(), USERNAME.key(), PASSWORD.key(), KEY_DEVICE.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + throw new IotdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, + result.getMsg()) + ); } this.pluginConfig = pluginConfig; } diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java index 91b3461daab..0a1313c120f 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java @@ -17,7 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.iotdb.sink; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.IoTDBRecord; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -59,9 +62,9 @@ private void tryInit() throws IOException { } Session.Builder sessionBuilder = new Session.Builder() - .nodeUrls(sinkConfig.getNodeUrls()) - .username(sinkConfig.getUsername()) - .password(sinkConfig.getPassword()); + .nodeUrls(sinkConfig.getNodeUrls()) + .username(sinkConfig.getUsername()) + .password(sinkConfig.getPassword()); if (sinkConfig.getThriftDefaultBufferSize() != null) { sessionBuilder.thriftDefaultBufferSize(sinkConfig.getThriftDefaultBufferSize()); } @@ -83,12 +86,13 @@ private void tryInit() throws IOException { } } catch (IoTDBConnectionException e) { log.error("Initialize IoTDB client failed.", e); - throw new IOException(e); + throw new IotdbConnectorException(IotdbConnectorErrorCode.INITIALIZE_CLIENT_FAILED, + "Initialize IoTDB client failed.", e); } if (sinkConfig.getBatchIntervalMs() != null) { scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); + new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); scheduledFuture = scheduler.scheduleAtFixedRate( () -> { try { @@ -97,9 +101,9 @@ private void tryInit() throws IOException { flushException = e; } }, - sinkConfig.getBatchIntervalMs(), - sinkConfig.getBatchIntervalMs(), - TimeUnit.MILLISECONDS); + sinkConfig.getBatchIntervalMs(), + sinkConfig.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); } initialize = true; } @@ -110,7 +114,7 @@ public synchronized void write(IoTDBRecord record) throws IOException { batchList.add(record); if (sinkConfig.getBatchSize() > 0 - && batchList.size() >= sinkConfig.getBatchSize()) { + && batchList.size() >= sinkConfig.getBatchSize()) { flush(); } } @@ -129,7 +133,8 @@ public synchronized void close() throws IOException { } } catch (IoTDBConnectionException e) { log.error("Close IoTDB client failed.", e); - throw new IOException("Close IoTDB client failed.", e); + throw new IotdbConnectorException(IotdbConnectorErrorCode.CLOSE_CLIENT_FAILED, + "Close IoTDB client failed.", e); } } @@ -144,30 +149,31 @@ synchronized void flush() throws IOException { try { if (batchRecords.getTypesList().isEmpty()) { session.insertRecords(batchRecords.getDeviceIds(), - batchRecords.getTimestamps(), - batchRecords.getMeasurementsList(), - batchRecords.getStringValuesList()); + batchRecords.getTimestamps(), + batchRecords.getMeasurementsList(), + batchRecords.getStringValuesList()); } else { session.insertRecords(batchRecords.getDeviceIds(), - batchRecords.getTimestamps(), - batchRecords.getMeasurementsList(), - batchRecords.getTypesList(), - batchRecords.getValuesList()); + batchRecords.getTimestamps(), + batchRecords.getMeasurementsList(), + batchRecords.getTypesList(), + batchRecords.getValuesList()); } } catch (IoTDBConnectionException | StatementExecutionException e) { log.error("Writing records to IoTDB failed, retry times = {}", i, e); if (i >= sinkConfig.getMaxRetries()) { - throw new IOException("Writing records to IoTDB failed.", e); + throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, + "Writing records to IoTDB failed.", e); } try { long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, - sinkConfig.getMaxRetryBackoffMs()); + sinkConfig.getMaxRetryBackoffMs()); Thread.sleep(backoff); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); - throw new IOException( - "Unable to flush; interrupted while doing another attempt.", e); + throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, + "Unable to flush; interrupted while doing another attempt.", e); } } } @@ -177,17 +183,18 @@ synchronized void flush() throws IOException { private void checkFlushException() { if (flushException != null) { - throw new RuntimeException("Writing records to IoTDB failed.", flushException); + throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, + "Writing records to IoTDB failed.", flushException); } } @Getter - private class BatchRecords { - private List deviceIds; - private List timestamps; - private List> measurementsList; - private List> typesList; - private List> valuesList; + private static class BatchRecords { + private final List deviceIds; + private final List timestamps; + private final List> measurementsList; + private final List> typesList; + private final List> valuesList; public BatchRecords(List batchList) { int batchSize = batchList.size(); diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java index 003a67fd149..9e66302ce84 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; @@ -34,6 +35,7 @@ import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -50,7 +52,7 @@ public class IoTDBSource implements SeaTunnelSource configParams = new HashMap(); + private final Map configParams = new HashMap<>(); @Override public String getPluginName() { @@ -64,7 +66,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "host and port and node urls are both empty"); + throw new IotdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, + result.getMsg()) + ); } } SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(pluginConfig); diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java index 28510361a2b..1e29fe9dcf1 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java @@ -34,6 +34,8 @@ import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.DefaultSeaTunnelRowDeserializer; import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.SeaTunnelRowDeserializer; @@ -91,7 +93,8 @@ public void close() throws IOException { session.close(); } } catch (IoTDBConnectionException e) { - throw new IOException("close IoTDB session failed", e); + throw new IotdbConnectorException(IotdbConnectorErrorCode.CLOSE_SESSION_FAILED, + "Close IoTDB session failed", e); } } @@ -127,9 +130,9 @@ private Session buildSession(Map conf) { Session.Builder sessionBuilder = new Session.Builder(); if (conf.containsKey(HOST.key())) { sessionBuilder - .host((String) conf.get(HOST.key())) - .port(Integer.parseInt(conf.get(PORT.key()).toString())) - .build(); + .host((String) conf.get(HOST.key())) + .port(Integer.parseInt(conf.get(PORT.key()).toString())) + .build(); } else { String nodeUrlsString = (String) conf.get(NODE_URLS.key()); List nodes = Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList()); diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java index 8b38f3abde7..b521ec18b28 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplit.java @@ -26,12 +26,12 @@ public class IoTDBSourceSplit implements SourceSplit { private static final long serialVersionUID = -1L; - private String splitId; + private final String splitId; /** * final query statement */ - private String query; + private final String query; @Override public String splitId() { diff --git a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java index 5820d1d4b18..ebd225efd33 100644 --- a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java @@ -27,6 +27,8 @@ import static org.apache.iotdb.tsfile.common.constant.QueryConstant.RESERVED_TIME; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException; import org.apache.seatunnel.connectors.seatunnel.iotdb.state.IoTDBSourceState; import com.google.common.base.Strings; @@ -133,7 +135,8 @@ private Set getIotDBSplit() { } sqls = sqlBase.split("(?i)" + SQL_WHERE); if (sqls.length > SQL_WHERE_SPLIT_LENGTH) { - throw new IllegalArgumentException("sql should not contain more than one where"); + throw new IotdbConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, + "sql should not contain more than one where"); } if (sqls.length > 1) { sqlBase = sqls[0]; @@ -241,6 +244,7 @@ public void close() { @Override public void handleSplitRequest(int subtaskId) { - throw new UnsupportedOperationException("Unsupported handleSplitRequest: " + subtaskId); + throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); } }