Skip to content

Commit

Permalink
[Improve][Connector-V2][Iotdb] Unified exception for iotdb source & s…
Browse files Browse the repository at this point in the history
…ink connector (#3557)

* [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector

* [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector

* [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector
  • Loading branch information
wuchunfu authored Nov 26, 2022
1 parent 48ab7c9 commit 7353fed
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 49 deletions.
10 changes: 9 additions & 1 deletion docs/en/connector-v2/Error-Quick-Reference-Manual.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,12 @@ This document records some common error codes and corresponding solutions of Sea
| KUDU-04 | Upsert data to Kudu failed | When users encounter this error code, it means that Kudu has some problems, please check it whether is work |
| KUDU-05 | Insert data to Kudu failed | When users encounter this error code, it means that Kudu has some problems, please check it whether is work |
| KUDU-06 | Initialize the Kudu client failed | When users encounter this error code, it is usually there are some problems with initializing the Kudu client, please check your configuration whether correct and connector is work |
| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When users encounter this error code, it means that there are some problems on Kudu parameters generation, please check your configuration |
| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When users encounter this error code, it means that there are some problems on Kudu parameters generation, please check your configuration |

## IotDB Connector Error Codes

| code | description | solution |
|----------|----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------|
| IOTDB-01 | Close IoTDB session failed | When the user encounters this error code, it indicates that closing the session failed. Please check |
| IOTDB-02 | Initialize IoTDB client failed | When the user encounters this error code, it indicates that the client initialization failed. Please check |
| IOTDB-03 | Close IoTDB client failed | When the user encounters this error code, it indicates that closing the client failed. Please check |
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iotdb.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum IotdbConnectorErrorCode implements SeaTunnelErrorCode {

CLOSE_SESSION_FAILED("IOTDB-01", "Close IoTDB session failed"),
INITIALIZE_CLIENT_FAILED("IOTDB-02", "Initialize IoTDB client failed"),
CLOSE_CLIENT_FAILED("IOTDB-03", "Close IoTDB client failed");

private final String code;
private final String description;

IotdbConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return this.code;
}

@Override
public String getDescription() {
return this.description;
}

@Override
public String getErrorMessage() {
return SeaTunnelErrorCode.super.getErrorMessage();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iotdb.exception;

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

public class IotdbConnectorException extends SeaTunnelRuntimeException {

public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}

public IotdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException;

import lombok.AllArgsConstructor;
import org.apache.iotdb.tsfile.read.common.Field;
Expand All @@ -43,11 +45,12 @@ private SeaTunnelRow convert(RowRecord rowRecord) {
long timestamp = rowRecord.getTimestamp();
List<Field> fields = rowRecord.getFields();
if (fields.size() != (rowType.getTotalFields() - 1)) {
throw new IllegalStateException("Illegal SeaTunnelRowType: " + rowRecord);
throw new IotdbConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"Illegal SeaTunnelRowType: " + rowRecord);
}

Object[] seaTunnelFields = new Object[rowType.getTotalFields()];
seaTunnelFields[0] = convertTimestamp(timestamp, rowType.getFieldType(0));
seaTunnelFields[0] = convertTimestamp(timestamp, rowType.getFieldType(0));
for (int i = 1; i < rowType.getTotalFields(); i++) {
Field field = fields.get(i - 1);
if (field == null || field.getDataType() == null) {
Expand All @@ -73,7 +76,8 @@ private Object convert(SeaTunnelDataType<?> seaTunnelFieldType,
case INT:
return int32.intValue();
default:
throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType);
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + seaTunnelFieldType);
}
case INT64:
return field.getLongV();
Expand All @@ -86,7 +90,8 @@ private Object convert(SeaTunnelDataType<?> seaTunnelFieldType,
case BOOLEAN:
return field.getBoolV();
default:
throw new IllegalArgumentException("unknown TSData type: " + field.getDataType());
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + field.getDataType());
}
}

Expand All @@ -101,7 +106,8 @@ private Object convertTimestamp(long timestamp,
case BIGINT:
return timestamp;
default:
throw new UnsupportedOperationException("Unsupported data type: " + seaTunnelFieldType);
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + seaTunnelFieldType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException;

import com.google.common.base.Strings;
import lombok.NonNull;
Expand Down Expand Up @@ -78,14 +80,15 @@ private Function<SeaTunnelRow, Long> createTimestampExtractor(SeaTunnelRowType s
case STRING:
return Long.parseLong((String) timestamp);
case TIMESTAMP:
return LocalDateTime.class.cast(timestamp)
return ((LocalDateTime) timestamp)
.atZone(ZoneOffset.UTC)
.toInstant()
.toEpochMilli();
case BIGINT:
return (Long) timestamp;
default:
throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType);
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + timestampFieldType);
}
};
}
Expand Down Expand Up @@ -167,7 +170,8 @@ private static TSDataType convert(SeaTunnelDataType dataType) {
case DOUBLE:
return TSDataType.DOUBLE;
default:
throw new UnsupportedOperationException("Unsupported dataType: " + dataType);
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + dataType);
}
}

Expand All @@ -191,7 +195,8 @@ private static Object convert(SeaTunnelDataType seaTunnelType,
case TEXT:
return value.toString();
default:
throw new UnsupportedOperationException("Unsupported dataType: " + tsDataType);
throw new IotdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported data type: " + tsDataType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.USERNAME;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -33,6 +34,7 @@
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -53,7 +55,11 @@ public String getPluginName() {
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS.key(), USERNAME.key(), PASSWORD.key(), KEY_DEVICE.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
throw new IotdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK,
result.getMsg())
);
}
this.pluginConfig = pluginConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iotdb.exception.IotdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.IoTDBRecord;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -59,9 +62,9 @@ private void tryInit() throws IOException {
}

Session.Builder sessionBuilder = new Session.Builder()
.nodeUrls(sinkConfig.getNodeUrls())
.username(sinkConfig.getUsername())
.password(sinkConfig.getPassword());
.nodeUrls(sinkConfig.getNodeUrls())
.username(sinkConfig.getUsername())
.password(sinkConfig.getPassword());
if (sinkConfig.getThriftDefaultBufferSize() != null) {
sessionBuilder.thriftDefaultBufferSize(sinkConfig.getThriftDefaultBufferSize());
}
Expand All @@ -83,12 +86,13 @@ private void tryInit() throws IOException {
}
} catch (IoTDBConnectionException e) {
log.error("Initialize IoTDB client failed.", e);
throw new IOException(e);
throw new IotdbConnectorException(IotdbConnectorErrorCode.INITIALIZE_CLIENT_FAILED,
"Initialize IoTDB client failed.", e);
}

if (sinkConfig.getBatchIntervalMs() != null) {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build());
new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build());
scheduledFuture = scheduler.scheduleAtFixedRate(
() -> {
try {
Expand All @@ -97,9 +101,9 @@ private void tryInit() throws IOException {
flushException = e;
}
},
sinkConfig.getBatchIntervalMs(),
sinkConfig.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
sinkConfig.getBatchIntervalMs(),
sinkConfig.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
initialize = true;
}
Expand All @@ -110,7 +114,7 @@ public synchronized void write(IoTDBRecord record) throws IOException {

batchList.add(record);
if (sinkConfig.getBatchSize() > 0
&& batchList.size() >= sinkConfig.getBatchSize()) {
&& batchList.size() >= sinkConfig.getBatchSize()) {
flush();
}
}
Expand All @@ -129,7 +133,8 @@ public synchronized void close() throws IOException {
}
} catch (IoTDBConnectionException e) {
log.error("Close IoTDB client failed.", e);
throw new IOException("Close IoTDB client failed.", e);
throw new IotdbConnectorException(IotdbConnectorErrorCode.CLOSE_CLIENT_FAILED,
"Close IoTDB client failed.", e);
}
}

Expand All @@ -144,30 +149,31 @@ synchronized void flush() throws IOException {
try {
if (batchRecords.getTypesList().isEmpty()) {
session.insertRecords(batchRecords.getDeviceIds(),
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getStringValuesList());
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getStringValuesList());
} else {
session.insertRecords(batchRecords.getDeviceIds(),
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getTypesList(),
batchRecords.getValuesList());
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getTypesList(),
batchRecords.getValuesList());
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
log.error("Writing records to IoTDB failed, retry times = {}", i, e);
if (i >= sinkConfig.getMaxRetries()) {
throw new IOException("Writing records to IoTDB failed.", e);
throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
"Writing records to IoTDB failed.", e);
}

try {
long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
sinkConfig.getMaxRetryBackoffMs());
sinkConfig.getMaxRetryBackoffMs());
Thread.sleep(backoff);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"Unable to flush; interrupted while doing another attempt.", e);
throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
"Unable to flush; interrupted while doing another attempt.", e);
}
}
}
Expand All @@ -177,17 +183,18 @@ synchronized void flush() throws IOException {

private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to IoTDB failed.", flushException);
throw new IotdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
"Writing records to IoTDB failed.", flushException);
}
}

@Getter
private class BatchRecords {
private List<String> deviceIds;
private List<Long> timestamps;
private List<List<String>> measurementsList;
private List<List<TSDataType>> typesList;
private List<List<Object>> valuesList;
private static class BatchRecords {
private final List<String> deviceIds;
private final List<Long> timestamps;
private final List<List<String>> measurementsList;
private final List<List<TSDataType>> typesList;
private final List<List<Object>> valuesList;

public BatchRecords(List<IoTDBRecord> batchList) {
int batchSize = batchList.size();
Expand Down
Loading

0 comments on commit 7353fed

Please sign in to comment.