From ee3a48aa7b43a177369181f1734f75a387148d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 17 Oct 2022 11:31:06 +0800 Subject: [PATCH 01/16] [Feature][Connector-V2] Add influxDB connector sink --- .../influxdb/client/InfluxDBClient.java | 14 ++ .../seatunnel/influxdb/config/SinkConfig.java | 92 +++++++++ .../influxdb/serialize/DefaultSerializer.java | 154 +++++++++++++++ .../influxdb/serialize/Serializer.java | 26 +++ .../seatunnel/influxdb/sink/InfluxDBSink.java | 61 ++++++ .../influxdb/sink/InfluxDBSinkWriter.java | 183 ++++++++++++++++++ .../connector-influxdb-e2e/pom.xml | 21 ++ .../e2e/connector/influxdb/InfluxdbIT.java | 181 +++++++++++++++++ .../test/resources/influxdb-to-influxdb.conf | 51 +++++ .../src/test/resources/log4j.properties | 22 +++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 11 files changed, 806 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java index 8743d5aa2c5..20bf19dde59 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.client; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; @@ -75,4 +76,17 @@ public Response intercept(Chain chain) throws IOException { log.info("connect influxdb successful. sever version :{}.", version); return influxDB; } + + public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) { + String rp = sinkConfig.getRp(); + if (!StringUtils.isEmpty(rp)) { + influxDB.setRetentionPolicy(rp); + } + } + + public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException { + InfluxDB influxDB = getInfluxDB(sinkConfig); + setWriteProperty(getInfluxDB(sinkConfig), sinkConfig); + return influxDB; + } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java new file mode 100644 index 00000000000..c021c3e73a6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.List; + +@Setter +@Getter +@ToString +public class SinkConfig extends InfluxDBConfig{ + public SinkConfig(Config config) { + super(config); + } + + private static final String KEY_TIME = "key_time"; + private static final String KEY_TAGS = "key_tags"; + private static final String KEY_MEASUREMENT = "measurement"; + + private static final String BATCH_SIZE = "batch_size"; + private static final String BATCH_INTERVAL_MS = "batch_interval_ms"; + private static final String MAX_RETRIES = "max_retries"; + private static final String WRITE_TIMEOUT = "write_timeout"; + private static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms"; + private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms"; + private static final String RETENTION_POLICY = "rp"; + private static final int DEFAULT_BATCH_SIZE = 1024; + private static final int DEFAULT_WRITE_TIMEOUT = 5; + + private String rp; + private String measurement; + private int writeTimeout = DEFAULT_WRITE_TIMEOUT; + private String keyTime; + private List keyTags; + private int batchSize = DEFAULT_BATCH_SIZE; + private Integer batchIntervalMs; + private int maxRetries; + private int retryBackoffMultiplierMs; + private int maxRetryBackoffMs; + + public static SinkConfig loadConfig(Config config) { + SinkConfig sinkConfig = new SinkConfig(config); + + if (config.hasPath(KEY_TIME)) { + sinkConfig.setKeyTime(config.getString(KEY_TIME)); + } + if (config.hasPath(KEY_TAGS)) { + sinkConfig.setKeyTags(config.getStringList(KEY_TAGS)); + } + if (config.hasPath(BATCH_INTERVAL_MS)) { + sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS)); + } + if (config.hasPath(MAX_RETRIES)) { + sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES)); + } + if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) { + sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (config.hasPath(MAX_RETRY_BACKOFF_MS)) { + sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS)); + } + if (config.hasPath(WRITE_TIMEOUT)) { + sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT)); + } + if (config.hasPath(RETENTION_POLICY)) { + sinkConfig.setRp(config.getString(RETENTION_POLICY)); + } + sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT)); + return sinkConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java new file mode 100644 index 00000000000..ac76ef8bc07 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.google.common.base.Strings; +import org.influxdb.dto.Point; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DefaultSerializer implements Serializer { + private SeaTunnelRowType seaTunnelRowType; + + private final BiConsumer timestampExtractor; + private final BiConsumer fieldExtractor; + private final BiConsumer tagExtractor; + private String measurement; + + private TimeUnit precision; + + public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit precision, List tagKeys, + String timestampKey, + String measurement) { + this.measurement = measurement; + this.seaTunnelRowType = seaTunnelRowType; + this.timestampExtractor = createTimestampExtractor(seaTunnelRowType, timestampKey); + this.tagExtractor = createTagExtractor(seaTunnelRowType, tagKeys); + List fieldKeys = getFieldKeys(seaTunnelRowType, timestampKey, tagKeys); + this.fieldExtractor = createFieldExtractor(seaTunnelRowType, fieldKeys); + this.precision = precision; + } + + @Override + public Point serialize(SeaTunnelRow seaTunnelRow) { + Point.Builder builder = Point.measurement(measurement); + timestampExtractor.accept(seaTunnelRow, builder); + tagExtractor.accept(seaTunnelRow, builder); + fieldExtractor.accept(seaTunnelRow, builder); + return builder.build(); + } + + private BiConsumer createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List fieldKeys) { + return (row, builder) -> { + for (int i = 0; i < fieldKeys.size(); i++) { + String field = fieldKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field); + SeaTunnelDataType dataType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow); + Object val = row.getField(indexOfSeaTunnelRow); + switch (dataType.getSqlType()) { + case BOOLEAN: + builder.addField(field, Boolean.valueOf((Boolean) val)); + break; + case SMALLINT: + builder.addField(field, Short.valueOf((Short) val)); + break; + case INT: + builder.addField(field, ((Number) val).intValue()); + break; + case BIGINT: + // Only timstamp support be bigint,however it is processed in specicalField + builder.addField(field, ((Number) val).longValue()); + break; + case FLOAT: + builder.addField(field, ((Number) val).floatValue()); + break; + case DOUBLE: + builder.addField(field, ((Number) val).doubleValue()); + break; + case STRING: + builder.addField(field, val.toString()); + break; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + dataType); + } + } + }; + } + + private BiConsumer createTimestampExtractor(SeaTunnelRowType seaTunnelRowType, + String timeKey) { + if (Strings.isNullOrEmpty(timeKey)) { + return (row, builder) -> System.currentTimeMillis(); + } + + int timeFieldIndex = seaTunnelRowType.indexOf(timeKey); + return (row, builder) -> { + Object time = row.getField(timeFieldIndex); + if (time == null) { + builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + SeaTunnelDataType timestampFieldType = seaTunnelRowType.getFieldType(timeFieldIndex); + switch (timestampFieldType.getSqlType()) { + case STRING: + builder.time(Long.parseLong((String) time), precision); + break; + case TIMESTAMP: + builder.time(LocalDateTime.class.cast(time) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), precision); + break; + case BIGINT: + builder.time((Long) time, precision); + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType); + } + }; + } + + private BiConsumer createTagExtractor(SeaTunnelRowType seaTunnelRowType, + List tagKeys) { + return (row, builder) -> { + for (int i = 0; i < tagKeys.size(); i++) { + String tagKey = tagKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey); + builder.tag(tagKey, row.getField(indexOfSeaTunnelRow).toString()); + } + }; + } + + private List getFieldKeys(SeaTunnelRowType seaTunnelRowType, + String timestampKey, + List tagKeys) { + return Stream.of(seaTunnelRowType.getFieldNames()) + .filter(name -> !tagKeys.contains(name)) + .filter(name -> !name.equals(timestampKey)) + .collect(Collectors.toList()); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java new file mode 100644 index 00000000000..b910efafd40 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.influxdb.dto.Point; + +public interface Serializer { + Point serialize(SeaTunnelRow seaTunnelRow); +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java new file mode 100644 index 00000000000..de809e86347 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.io.IOException; + +public class InfluxDBSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "InfluxDB"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + this.pluginConfig = pluginConfig; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java new file mode 100644 index 00000000000..cb8e6f3dddd --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class InfluxDBSinkWriter extends AbstractSinkWriter { + + private final Serializer serializer; + private InfluxDB influxDB; + private SinkConfig sinkConfig; + private final List batchList; + + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile boolean initialize; + private volatile Exception flushException; + + public InfluxDBSinkWriter(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) throws ConnectException { + this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.serializer = new DefaultSerializer( + seaTunnelRowType, null, sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); + connect(); + this.batchList = new ArrayList<>(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + Point record = serializer.serialize(element); + write(record); + } + + @SneakyThrows + @Override + public Optional prepareCommit() { + // Flush to storage before snapshot state is performed + flush(); + return super.prepareCommit(); + } + + @Override + public void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + + if (influxDB != null) { + influxDB.close(); + influxDB = null; + } + } + + private void tryInit() throws IOException { + if (initialize) { + return; + } + connect(); + if (sinkConfig.getBatchIntervalMs() != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + sinkConfig.getBatchIntervalMs(), + sinkConfig.getBatchIntervalMs(), + TimeUnit.MILLISECONDS); + } + initialize = true; + } + + public synchronized void write(Point record) throws IOException { + tryInit(); + checkFlushException(); + + batchList.add(record); + if (sinkConfig.getBatchSize() > 0 + && batchList.size() >= sinkConfig.getBatchSize()) { + flush(); + } + } + + public synchronized void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + BatchPoints.Builder batchPoints = BatchPoints.builder(); + for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { + try { + batchPoints.points(batchList); + influxDB.write(batchPoints.build()); + } catch (Exception e) { + log.error("Writing records to InfluxDB failed, retry times = {}", i, e); + if (i >= sinkConfig.getMaxRetries()) { + throw new IOException("Writing records to InfluxDB failed.", e); + } + + try { + long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, + sinkConfig.getMaxRetryBackoffMs()); + Thread.sleep(backoff); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "Unable to flush; interrupted while doing another attempt.", e); + } + } + } + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to InfluxDB failed.", flushException); + } + } + + public void connect() throws ConnectException { + if (influxDB == null) { + influxDB = InfluxDBClient.getWriteClient(sinkConfig); + String version = influxDB.version(); + if (!influxDB.ping().isGood()) { + String errorMessage = + String.format( + "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", + sinkConfig.getUrl()); + throw new ConnectException(errorMessage); + } + log.info("connect influxdb successful. sever version :{}.", version); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml new file mode 100644 index 00000000000..272097fdcab --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml @@ -0,0 +1,21 @@ + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-influxdb-e2e + + + + + org.apache.seatunnel + connector-influxdb + ${project.version} + test + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java new file mode 100644 index 00000000000..3df9b5af84c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.influxdb; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class InfluxdbIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "influxdb:1.8"; + private static final String HOST = "influxdb-host"; + private static final int PORT = 8764; + private static final String INFLUXDB_DATABASE = "test"; + private static final String INFLUXDB_MEASUREMENT = "test"; + + private static final String INFLUXDB_HOST = "localhost"; + + private static final int INFLUXDB_PORT = 8764; + + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + + private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT); + + private GenericContainer influxdbContainer; + + private InfluxDB influxDB; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.influxdbContainer = new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(influxdbContainer)).join(); + log.info("Influxdb container started"); + this.initializeInfluxDBClient(); + this.initSourceData(); + } + + private void initSourceData() { + influxDB.createDatabase(INFLUXDB_DATABASE); + BatchPoints batchPoints = BatchPoints + .database(INFLUXDB_DATABASE) + .build(); + List rows = TEST_DATASET._2(); + SeaTunnelRowType rowType = TEST_DATASET._1(); + + for (int i = 0; i < rows.size(); i++) { + SeaTunnelRow row = rows.get(i); + Point point = Point.measurement(INFLUXDB_MEASUREMENT) + .time((Long) row.getField(0), TimeUnit.NANOSECONDS) + .tag(rowType.getFieldName(1), (String) row.getField(1)) + .addField(rowType.getFieldName(2), (String) row.getField(2)) + .addField(rowType.getFieldName(3), (Double) row.getField(3)) + .addField(rowType.getFieldName(4), (Long) row.getField(4)) + .addField(rowType.getFieldName(5), (Float) row.getField(5)) + .addField(rowType.getFieldName(6), (Integer) row.getField(6)) + .addField(rowType.getFieldName(7), (Short) row.getField(7)) + .addField(rowType.getFieldName(8), (Boolean) row.getField(8)) + .build(); + batchPoints.point(point); + } + influxDB.write(batchPoints); + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "time", + "label", + "c_string", + "c_double", + "c_bigint", + "c_float", + "c_int", + "c_smallint", + "c_boolean" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.INT_TYPE, + BasicType.SHORT_TYPE, + BasicType.BOOLEAN_TYPE + } + ); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + new Date().getTime(), + String.format("label_%s", i), + String.format("f1_%s", i), + Double.parseDouble("1.1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Short.parseShort("1"), + Integer.valueOf(i), + i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE + }); + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + influxDB.close(); + influxdbContainer.stop(); + } + + @TestTemplate + public void testRedis(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + private void initializeInfluxDBClient() throws ConnectException { + InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL); + influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf new file mode 100644 index 00000000000..1c15644f197 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + keys = "key_test*" + data_type = key + } +} + +transform { +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "SeaTunnel" + key = "key_list" + data_type = list + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties new file mode 100644 index 00000000000..db5d9e51220 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index fbc2b214ac9..135b1995a3d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -27,6 +27,7 @@ connector-assert-e2e connector-jdbc-e2e connector-redis-e2e + connector-influxdb-e2e seatunnel-connector-v2-e2e From 59e1956bb8b2acb9f82cf6a5c0ab3812de81e054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Wed, 19 Oct 2022 21:29:51 +0800 Subject: [PATCH 02/16] [Feature][Connector-V2] Add influxDB connector sink --- plugin-mapping.properties | 3 +- .../influxdb/config/InfluxDBConfig.java | 2 +- .../seatunnel/influxdb/config/SinkConfig.java | 6 ++- .../influxdb/config/TimePrecision.java | 49 +++++++++++++++++++ .../seatunnel/influxdb/sink/InfluxDBSink.java | 14 +++++- .../influxdb/sink/InfluxDBSinkWriter.java | 6 +-- .../e2e/connector/influxdb/InfluxdbIT.java | 12 ++--- .../test/resources/influxdb-to-influxdb.conf | 48 +++++++++++++----- 8 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 0b786891e48..82c9db47cf8 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -130,4 +130,5 @@ seatunnel.sink.Sentry = connector-sentry seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg -seatunnel.source.influxdb = connector-influxdb \ No newline at end of file +seatunnel.source.influxdb = connector-influxdb +seatunnel.sink.influxdb = connector-influxdb \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java index 9a04e7d4aec..bf5e30e292a 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java @@ -45,7 +45,7 @@ public class InfluxDBConfig implements Serializable { private static final String DEFAULT_FORMAT = "MSGPACK"; - private static final String EPOCH = "epoch"; + protected static final String EPOCH = "epoch"; public static final String DEFAULT_PARTITIONS = "0"; private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3; diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java index c021c3e73a6..1e230698bb8 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -35,7 +35,7 @@ public SinkConfig(Config config) { private static final String KEY_TIME = "key_time"; private static final String KEY_TAGS = "key_tags"; - private static final String KEY_MEASUREMENT = "measurement"; + public static final String KEY_MEASUREMENT = "measurement"; private static final String BATCH_SIZE = "batch_size"; private static final String BATCH_INTERVAL_MS = "batch_interval_ms"; @@ -57,6 +57,7 @@ public SinkConfig(Config config) { private int maxRetries; private int retryBackoffMultiplierMs; private int maxRetryBackoffMs; + private TimePrecision precision = TimePrecision.NS; public static SinkConfig loadConfig(Config config) { SinkConfig sinkConfig = new SinkConfig(config); @@ -85,6 +86,9 @@ public static SinkConfig loadConfig(Config config) { if (config.hasPath(RETENTION_POLICY)) { sinkConfig.setRp(config.getString(RETENTION_POLICY)); } + if (config.hasPath(EPOCH)) { + sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH))); + } sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT)); return sinkConfig; } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java new file mode 100644 index 00000000000..18af2cdd6e0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import java.util.concurrent.TimeUnit; + +public enum TimePrecision { + NS("NS", TimeUnit.NANOSECONDS), + U("U", TimeUnit.MICROSECONDS), + MS("MS", TimeUnit.MILLISECONDS), + S("S", TimeUnit.SECONDS), + M("M", TimeUnit.MINUTES), + H("H", TimeUnit.HOURS); + private String desc; + private TimeUnit precision; + + TimePrecision(String desc, TimeUnit precision) { + this.desc = desc; + this.precision = precision; + } + + public TimeUnit getTimeUnit() { + return this.precision; + } + + public static TimePrecision getPrecision(String desc) { + for (TimePrecision timePrecision : TimePrecision.values()) { + if (desc.equals(timePrecision.desc)) { + return timePrecision; + } + } + return TimePrecision.NS; + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java index de809e86347..e921ccf4047 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -17,11 +17,17 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; @@ -40,8 +46,12 @@ public String getPluginName() { } @Override - public void prepare(Config pluginConfig) throws PrepareFailException { - this.pluginConfig = pluginConfig; + public void prepare(Config config) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(config, URL, KEY_MEASUREMENT); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + this.pluginConfig = config; } @Override diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index cb8e6f3dddd..5df5cf545c5 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -61,7 +61,7 @@ public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException { this.sinkConfig = SinkConfig.loadConfig(pluginConfig); this.serializer = new DefaultSerializer( - seaTunnelRowType, null, sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); + seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); connect(); this.batchList = new ArrayList<>(); } @@ -102,7 +102,7 @@ private void tryInit() throws IOException { connect(); if (sinkConfig.getBatchIntervalMs() != null) { scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build()); + new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); scheduledFuture = scheduler.scheduleAtFixedRate( () -> { try { @@ -140,7 +140,7 @@ public synchronized void flush() throws IOException { batchPoints.points(batchList); influxDB.write(batchPoints.build()); } catch (Exception e) { - log.error("Writing records to InfluxDB failed, retry times = {}", i, e); + log.error("Writing records to influxdb failed, retry times = {}", i, e); if (i >= sinkConfig.getMaxRetries()) { throw new IOException("Writing records to InfluxDB failed.", e); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 3df9b5af84c..388f2bba298 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -58,19 +58,16 @@ public class InfluxdbIT extends TestSuiteBase implements TestResource { private static final String IMAGE = "influxdb:1.8"; private static final String HOST = "influxdb-host"; - private static final int PORT = 8764; + private static final int PORT = 8086; private static final String INFLUXDB_DATABASE = "test"; private static final String INFLUXDB_MEASUREMENT = "test"; - private static final String INFLUXDB_HOST = "localhost"; - - private static final int INFLUXDB_PORT = 8764; private static final Tuple2> TEST_DATASET = generateTestDataSet(); - private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT); private GenericContainer influxdbContainer; + private String influxDBConnectUrl; private InfluxDB influxDB; @@ -85,6 +82,7 @@ public void startUp() throws Exception { .waitingFor(new HostPortWaitStrategy() .withStartupTimeout(Duration.ofMinutes(2))); Startables.deepStart(Stream.of(influxdbContainer)).join(); + influxDBConnectUrl = String.format("http://%s:%s", influxdbContainer.getHost(), influxdbContainer.getFirstMappedPort()); log.info("Influxdb container started"); this.initializeInfluxDBClient(); this.initSourceData(); @@ -152,8 +150,8 @@ private static Tuple2> generateTestDataSet( Double.parseDouble("1.1"), Long.parseLong("1"), Float.parseFloat("1.1"), - Short.parseShort("1"), Integer.valueOf(i), + Short.parseShort("1"), i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE }); rows.add(row); @@ -175,7 +173,7 @@ public void testRedis(TestContainer container) throws IOException, InterruptedEx } private void initializeInfluxDBClient() throws ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL); + InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf index 1c15644f197..f883fcada2c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -28,12 +28,24 @@ env { } source { - Redis { - host = "redis-e2e" - port = 6379 - auth = "SeaTunnel" - keys = "key_test*" - data_type = key + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" + database = "test" + upper_bound = 100 + lower_bound = 1 + partition_num = 4 + split_column = "f5" + fields { + label = STRING + f1 = STRING + f2 = DOUBLE + f3 = BIGINT + f4 = FLOAT + f5 = INT + f6 = SMALLINT + f7 = BOOLEAN + } } } @@ -41,11 +53,23 @@ transform { } sink { - Redis { - host = "redis-e2e" - port = 6379 - auth = "SeaTunnel" - key = "key_list" - data_type = list + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" + database = "test" + upper_bound = 100 + lower_bound = 1 + partition_num = 4 + split_column = "f5" + fields { + label = STRING + f1 = STRING + f2 = DOUBLE + f3 = BIGINT + f4 = FLOAT + f5 = INT + f6 = SMALLINT + f7 = BOOLEAN + } } } \ No newline at end of file From 840fcc65799dba0629bbf3a816eafa10effbcc14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 13:14:30 +0800 Subject: [PATCH 03/16] [Feature][Connector-V2] Add influxDB connector sink --- docs/en/connector-v2/sink/InfluxDB.md | 116 ++++++++++++++++++ plugin-mapping.properties | 2 +- .../influxdb/client/InfluxDBClient.java | 1 + .../influxdb/config/InfluxDBConfig.java | 34 ----- .../influxdb/config/SourceConfig.java | 67 ++++++++++ .../influxdb/serialize/DefaultSerializer.java | 6 +- .../seatunnel/influxdb/sink/InfluxDBSink.java | 4 + .../influxdb/sink/InfluxDBSinkWriter.java | 2 +- .../influxdb/source/InfluxDBSource.java | 23 ++-- .../source/InfluxDBSourceSplitEnumerator.java | 12 +- .../e2e/connector/influxdb/InfluxdbIT.java | 27 +++- .../test/resources/influxdb-to-influxdb.conf | 33 ++--- 12 files changed, 244 insertions(+), 83 deletions(-) create mode 100644 docs/en/connector-v2/sink/InfluxDB.md create mode 100644 seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md new file mode 100644 index 00000000000..04e91171ce6 --- /dev/null +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -0,0 +1,116 @@ +# InfluxDB + +> InfluxDB sink connector + +## Description + +Write data to InfluxDB. + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + + +## Options + +| name | type | required | default value | +|-----------------------------|----------|----------|-----------------------------| +| url | string | yes | - | +| fields | config | yes | - | +| database | string | yes | | +| measurement | string | yes | | +| username | string | no | - | +| password | string | no | - | +| keyTime | string | yes | processing | +| keyTags | array | no | exclude `field` & `keyTime` | +| batch_size | int | no | 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| connect_timeout_ms | long | no | 15000 | + +### url +the url to connect to influxDB e.g. +``` +http://influxdb-host:8086 +``` + +### sql [string] +The query sql used to search data + +``` +select name,age from test +``` + +### fields [string] + +the fields of the InfluxDB when you select + +the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType` + +e.g. + +``` +fields{ + name=STRING + age=INT + } +``` + +### database [string] + +The `influxDB` database + +### username [string] + +the username of the influxDB when you select + +### password [string] + +the password of the influxDB when you select + +### keyTime [string] + +Specify field-name of the `influxDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp + +### keyTags [array] + +Specify field-name of the `influxDB` measurement tags in SeaTunnelRow. +If not specified, include all fields with `influxDB` measurement field + +### batch_size [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### batch_interval_ms [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### max_retries [int] + +The number of retries to flush failed + +### retry_backoff_multiplier_ms [int] + +Using as a multiplier for generating the next delay for backoff + +### max_retry_backoff_ms [int] + +The amount of time to wait before attempting to retry a request to `influxDB` + +### connect_timeout_ms [long] +the timeout for connecting to InfluxDB, in milliseconds + +## Examples +```hocon +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + batch_size = 1 + } +} + +``` \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 82c9db47cf8..6980413c980 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -131,4 +131,4 @@ seatunnel.source.MongoDB = connector-mongodb seatunnel.sink.MongoDB = connector-mongodb seatunnel.source.Iceberg = connector-iceberg seatunnel.source.influxdb = connector-influxdb -seatunnel.sink.influxdb = connector-influxdb \ No newline at end of file +seatunnel.sink.InfluxDB = connector-influxdb \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java index 20bf19dde59..3ad3a99d53b 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java @@ -86,6 +86,7 @@ public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) { public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException { InfluxDB influxDB = getInfluxDB(sinkConfig); + influxDB.setDatabase(sinkConfig.getDatabase()); setWriteProperty(getInfluxDB(sinkConfig), sinkConfig); return influxDB; } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java index bf5e30e292a..1332c5cab57 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java @@ -23,7 +23,6 @@ import lombok.Data; import java.io.Serializable; -import java.util.List; @Data public class InfluxDBConfig implements Serializable { @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable { public static final String URL = "url"; private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms"; private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec"; - - public static final String SQL = "sql"; - public static final String SQL_WHERE = "where"; - public static final String DATABASES = "database"; - public static final String SPLIT_COLUMN = "split_column"; - private static final String PARTITION_NUM = "partition_num"; - private static final String UPPER_BOUND = "upper_bound"; - private static final String LOWER_BOUND = "lower_bound"; - - private static final String DEFAULT_FORMAT = "MSGPACK"; protected static final String EPOCH = "epoch"; - - public static final String DEFAULT_PARTITIONS = "0"; private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3; private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000; - private static final String DEFAULT_EPOCH = "n"; private String url; private String username; private String password; - private String sql; - private int partitionNum = 0; - private String splitKey; - private long lowerBound; - private long upperBound; private String database; private String format = DEFAULT_FORMAT; @@ -69,11 +50,8 @@ public class InfluxDBConfig implements Serializable { private String epoch = DEFAULT_EPOCH; - List columnsIndex; - public InfluxDBConfig(Config config) { this.url = config.getString(URL); - this.sql = config.getString(SQL); if (config.hasPath(USERNAME)) { this.username = config.getString(USERNAME); @@ -81,18 +59,6 @@ public InfluxDBConfig(Config config) { if (config.hasPath(PASSWORD)) { this.password = config.getString(PASSWORD); } - if (config.hasPath(PARTITION_NUM)) { - this.partitionNum = config.getInt(PARTITION_NUM); - } - if (config.hasPath(UPPER_BOUND)) { - this.upperBound = config.getInt(UPPER_BOUND); - } - if (config.hasPath(LOWER_BOUND)) { - this.lowerBound = config.getInt(LOWER_BOUND); - } - if (config.hasPath(SPLIT_COLUMN)) { - this.splitKey = config.getString(SPLIT_COLUMN); - } if (config.hasPath(DATABASES)) { this.database = config.getString(DATABASES); } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java new file mode 100644 index 00000000000..d0c3fe65e2e --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; + +import java.util.List; + +@Getter +public class SourceConfig extends InfluxDBConfig{ + public static final String SQL = "sql"; + public static final String SQL_WHERE = "where"; + public static final String SPLIT_COLUMN = "split_column"; + private static final String PARTITION_NUM = "partition_num"; + private static final String UPPER_BOUND = "upper_bound"; + private static final String LOWER_BOUND = "lower_bound"; + public static final String DEFAULT_PARTITIONS = "0"; + private String sql; + private int partitionNum = 0; + private String splitKey; + private long lowerBound; + private long upperBound; + + List columnsIndex; + + public SourceConfig(Config config) { + super(config); + } + + public static SourceConfig loadConfig(Config config) { + SourceConfig sourceConfig = new SourceConfig(config); + + sourceConfig.sql = config.getString(SQL); + + if (config.hasPath(PARTITION_NUM)) { + sourceConfig.partitionNum = config.getInt(PARTITION_NUM); + } + if (config.hasPath(UPPER_BOUND)) { + sourceConfig.upperBound = config.getInt(UPPER_BOUND); + } + if (config.hasPath(LOWER_BOUND)) { + sourceConfig.lowerBound = config.getInt(LOWER_BOUND); + } + if (config.hasPath(SPLIT_COLUMN)) { + sourceConfig.splitKey = config.getString(SPLIT_COLUMN); + } + return sourceConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java index ac76ef8bc07..028208b1b63 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import com.google.common.base.Strings; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.influxdb.dto.Point; import java.time.LocalDateTime; @@ -147,8 +149,8 @@ private List getFieldKeys(SeaTunnelRowType seaTunnelRowType, String timestampKey, List tagKeys) { return Stream.of(seaTunnelRowType.getFieldNames()) - .filter(name -> !tagKeys.contains(name)) - .filter(name -> !name.equals(timestampKey)) + .filter(name -> CollectionUtils.isEmpty(tagKeys) || !tagKeys.contains(name)) + .filter(name -> StringUtils.isEmpty(timestampKey) || !name.equals(timestampKey)) .collect(Collectors.toList()); } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java index e921ccf4047..8d3eb7290ff 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -21,6 +21,7 @@ import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -33,8 +34,11 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import com.google.auto.service.AutoService; + import java.io.IOException; +@AutoService(SeaTunnelSink.class) public class InfluxDBSink extends AbstractSimpleSink { private Config pluginConfig; diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index 5df5cf545c5..1c18d31882b 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -134,7 +134,7 @@ public synchronized void flush() throws IOException { if (batchList.isEmpty()) { return; } - BatchPoints.Builder batchPoints = BatchPoints.builder(); + BatchPoints.Builder batchPoints = BatchPoints.database(sinkConfig.getDatabase()); for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { try { batchPoints.points(batchList); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java index bc971476bda..804e804f560 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; @@ -33,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -53,7 +52,7 @@ @AutoService(SeaTunnelSource.class) public class InfluxDBSource implements SeaTunnelSource { private SeaTunnelRowType typeInfo; - private InfluxDBConfig influxDBConfig; + private SourceConfig sourceConfig; private List columnsIndexList; @@ -66,15 +65,15 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL); + CheckResult result = CheckConfigUtil.checkAllExists(config, SQL); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } try { - this.influxDBConfig = new InfluxDBConfig(config); + this.sourceConfig = SourceConfig.loadConfig(config); SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config); this.typeInfo = seatunnelSchema.getSeaTunnelRowType(); - this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig)); + this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig)); } catch (Exception e) { throw new PrepareFailException("InfluxDB", PluginType.SOURCE, e.toString()); } @@ -92,26 +91,26 @@ public SeaTunnelDataType getProducedType() { @Override public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new InfluxdbSourceReader(influxDBConfig, readerContext, typeInfo, columnsIndexList); + return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo, columnsIndexList); } @Override public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, sourceConfig); } @Override public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, InfluxDBSourceState checkpointState) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, sourceConfig); } private List initColumnsIndex(InfluxDB influxDB) { //query one row to get column info - String query = influxDBConfig.getSql() + QUERY_LIMIT; + String query = sourceConfig.getSql() + QUERY_LIMIT; List fieldNames = new ArrayList<>(); try { QueryResult queryResult = influxDB.query( - new Query(query, influxDBConfig.getDatabase())); + new Query(query, sourceConfig.getDatabase())); List serieList = queryResult.getResults().get(0).getSeries(); fieldNames.addAll(serieList.get(0).getColumns()); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java index d22eba1166e..139a6e3ad5c 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java @@ -17,10 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import lombok.extern.slf4j.Slf4j; @@ -37,17 +37,17 @@ @Slf4j public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator { - final InfluxDBConfig config; + final SourceConfig config; private final Context context; private final Map> pendingSplit; private final Object stateLock = new Object(); private volatile boolean shouldEnumerate; - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, SourceConfig config) { this(context, null, config); } - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, SourceConfig config) { this.context = context; this.config = config; this.pendingSplit = new HashMap<>(); @@ -113,7 +113,7 @@ private Set getInfluxDBSplit() { Set influxDBSourceSplits = new HashSet<>(); // no need numPartitions, use one partition if (config.getPartitionNum() == 0) { - influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql)); + influxDBSourceSplits.add(new InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql)); return influxDBSourceSplits; } //calculate numRange base on (lowerBound upperBound partitionNum) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 388f2bba298..b817470b70f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -28,9 +28,12 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -60,12 +63,12 @@ public class InfluxdbIT extends TestSuiteBase implements TestResource { private static final String HOST = "influxdb-host"; private static final int PORT = 8086; private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; + private static final String INFLUXDB_SOURCE_MEASUREMENT = "source"; + private static final String INFLUXDB_SINK_MEASUREMENT = "sink"; private static final Tuple2> TEST_DATASET = generateTestDataSet(); - private GenericContainer influxdbContainer; private String influxDBConnectUrl; @@ -98,7 +101,7 @@ private void initSourceData() { for (int i = 0; i < rows.size(); i++) { SeaTunnelRow row = rows.get(i); - Point point = Point.measurement(INFLUXDB_MEASUREMENT) + Point point = Point.measurement(INFLUXDB_SOURCE_MEASUREMENT) .time((Long) row.getField(0), TimeUnit.NANOSECONDS) .tag(rowType.getFieldName(1), (String) row.getField(1)) .addField(rowType.getFieldName(2), (String) row.getField(2)) @@ -167,9 +170,25 @@ public void tearDown() throws Exception { } @TestTemplate - public void testRedis(TestContainer container) throws IOException, InterruptedException { + public void testInfluxdb(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb.conf"); Assertions.assertEquals(0, execResult.getExitCode()); + String sourceSql = String.format("select * from %s", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s", INFLUXDB_SINK_MEASUREMENT); + QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); + for (QueryResult.Result result : sourceQueryResult.getResults()) { + List serieList = result.getSeries(); + if (CollectionUtils.isNotEmpty(serieList)) { + for (QueryResult.Series series : serieList) { + for (List values : series.getValues()) { + + } + } + } + } + } private void initializeInfluxDBClient() throws ConnectException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf index f883fcada2c..6ad0bbc2d3a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -30,7 +30,7 @@ env { source { InfluxDB { url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source" database = "test" upper_bound = 100 lower_bound = 1 @@ -38,13 +38,13 @@ source { split_column = "f5" fields { label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN } } } @@ -55,21 +55,8 @@ transform { sink { InfluxDB { url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" database = "test" - upper_bound = 100 - lower_bound = 1 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } + measurement = "sink" + batch_size = 1 } } \ No newline at end of file From 294f72141bab92c0e3e0357d4626f92b9572dded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 16:31:58 +0800 Subject: [PATCH 04/16] fix doc style --- docs/en/connector-v2/sink/InfluxDB.md | 34 ++++++--------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md index 04e91171ce6..1fd577efd95 100644 --- a/docs/en/connector-v2/sink/InfluxDB.md +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -11,18 +11,16 @@ Write data to InfluxDB. - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [schema projection](../../concept/connector-v2-features.md) - ## Options | name | type | required | default value | |-----------------------------|----------|----------|-----------------------------| | url | string | yes | - | -| fields | config | yes | - | | database | string | yes | | | measurement | string | yes | | | username | string | no | - | | password | string | no | - | -| keyTime | string | yes | processing | +| keyTime | string | yes | processing time | | keyTags | array | no | exclude `field` & `keyTime` | | batch_size | int | no | 1024 | | batch_interval_ms | int | no | - | @@ -36,39 +34,21 @@ the url to connect to influxDB e.g. http://influxdb-host:8086 ``` -### sql [string] -The query sql used to search data - -``` -select name,age from test -``` - -### fields [string] - -the fields of the InfluxDB when you select - -the field type is SeaTunnel field type `org.apache.seatunnel.api.table.type.SqlType` - -e.g. +### database [string] -``` -fields{ - name=STRING - age=INT - } -``` +The name of `influxDB` database -### database [string] +### measurement [string] -The `influxDB` database +The name of `influxDB` measurement ### username [string] -the username of the influxDB when you select +`influxDB` user username ### password [string] -the password of the influxDB when you select +`influxDB` user password ### keyTime [string] From 5405e4f698e37d7a41f61db58ea8af2bb9566fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 19:39:27 +0800 Subject: [PATCH 05/16] [Feature][Connector-V2] Add influxDB connector sink --- docs/en/connector-v2/sink/InfluxDB.md | 36 ++++++++++--------- .../seatunnel/influxdb/config/SinkConfig.java | 3 +- .../influxdb/serialize/DefaultSerializer.java | 8 ++++- .../influxdb/sink/InfluxDBSinkWriter.java | 13 ++++--- seatunnel-dist/pom.xml | 6 ++++ .../connector-influxdb-e2e/pom.xml | 6 ++++ .../e2e/connector/influxdb/InfluxdbIT.java | 24 ++++++++----- .../test/resources/influxdb-to-influxdb.conf | 12 +++---- 8 files changed, 67 insertions(+), 41 deletions(-) diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md index 1fd577efd95..d33fcff6fa4 100644 --- a/docs/en/connector-v2/sink/InfluxDB.md +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -13,20 +13,20 @@ Write data to InfluxDB. ## Options -| name | type | required | default value | -|-----------------------------|----------|----------|-----------------------------| -| url | string | yes | - | -| database | string | yes | | -| measurement | string | yes | | -| username | string | no | - | -| password | string | no | - | -| keyTime | string | yes | processing time | -| keyTags | array | no | exclude `field` & `keyTime` | -| batch_size | int | no | 1024 | -| batch_interval_ms | int | no | - | -| max_retries | int | no | - | -| retry_backoff_multiplier_ms | int | no | - | -| connect_timeout_ms | long | no | 15000 | +| name | type | required | default value | +|-----------------------------|----------|----------|-------------------------------| +| url | string | yes | - | +| database | string | yes | | +| measurement | string | yes | | +| username | string | no | - | +| password | string | no | - | +| key_time | string | yes | processing time | +| key_tags | array | no | exclude `field` & `key_time` | +| batch_size | int | no | 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| connect_timeout_ms | long | no | 15000 | ### url the url to connect to influxDB e.g. @@ -50,11 +50,11 @@ The name of `influxDB` measurement `influxDB` user password -### keyTime [string] +### key_time [string] -Specify field-name of the `influxDB` timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp +Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp -### keyTags [array] +### key_tags [array] Specify field-name of the `influxDB` measurement tags in SeaTunnelRow. If not specified, include all fields with `influxDB` measurement field @@ -89,6 +89,8 @@ sink { url = "http://influxdb-host:8086" database = "test" measurement = "sink" + key_time = "time" + key_tags = ["label"] batch_size = 1 } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java index 1e230698bb8..c97d807fabf 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -46,6 +46,7 @@ public SinkConfig(Config config) { private static final String RETENTION_POLICY = "rp"; private static final int DEFAULT_BATCH_SIZE = 1024; private static final int DEFAULT_WRITE_TIMEOUT = 5; + private static final TimePrecision DEFAULT_TIME_PRECISION = TimePrecision.NS; private String rp; private String measurement; @@ -57,7 +58,7 @@ public SinkConfig(Config config) { private int maxRetries; private int retryBackoffMultiplierMs; private int maxRetryBackoffMs; - private TimePrecision precision = TimePrecision.NS; + private TimePrecision precision = DEFAULT_TIME_PRECISION; public static SinkConfig loadConfig(Config config) { SinkConfig sinkConfig = new SinkConfig(config); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java index 028208b1b63..8cc458939de 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java @@ -104,8 +104,9 @@ private BiConsumer createFieldExtractor(SeaTunnelRo private BiConsumer createTimestampExtractor(SeaTunnelRowType seaTunnelRowType, String timeKey) { + //not config timeKey, use processing time if (Strings.isNullOrEmpty(timeKey)) { - return (row, builder) -> System.currentTimeMillis(); + return (row, builder) -> builder.time(System.currentTimeMillis(), precision); } int timeFieldIndex = seaTunnelRowType.indexOf(timeKey); @@ -136,6 +137,11 @@ private BiConsumer createTimestampExtractor(SeaTunn private BiConsumer createTagExtractor(SeaTunnelRowType seaTunnelRowType, List tagKeys) { + //not config tagKeys + if (CollectionUtils.isEmpty(tagKeys)) { + return (row, builder) -> {}; + } + return (row, builder) -> { for (int i = 0; i < tagKeys.size(); i++) { String tagKey = tagKeys.get(i); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index 1c18d31882b..a7e41a6d937 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -56,14 +56,17 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter { private ScheduledFuture scheduledFuture; private volatile boolean initialize; private volatile Exception flushException; + private final Integer batchIntervalMs; public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException { this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); this.serializer = new DefaultSerializer( seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); - connect(); this.batchList = new ArrayList<>(); + + connect(); } @Override @@ -99,8 +102,9 @@ private void tryInit() throws IOException { if (initialize) { return; } + initialize = true; connect(); - if (sinkConfig.getBatchIntervalMs() != null) { + if (batchIntervalMs != null) { scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); scheduledFuture = scheduler.scheduleAtFixedRate( @@ -111,11 +115,10 @@ private void tryInit() throws IOException { flushException = e; } }, - sinkConfig.getBatchIntervalMs(), - sinkConfig.getBatchIntervalMs(), + batchIntervalMs, + batchIntervalMs, TimeUnit.MILLISECONDS); } - initialize = true; } public synchronized void write(Point record) throws IOException { diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index a0d0350d3eb..5ede53eb2ae 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -272,6 +272,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-influxdb + ${project.version} + provided + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml index 272097fdcab..c5c3d3ddae3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml @@ -17,5 +17,11 @@ ${project.version} test + + org.apache.seatunnel + connector-console + ${project.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index b817470b70f..0b7a59dbb0f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -43,15 +43,19 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -173,22 +177,24 @@ public void tearDown() throws Exception { public void testInfluxdb(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - String sourceSql = String.format("select * from %s", INFLUXDB_SOURCE_MEASUREMENT); - String sinkSql = String.format("select * from %s", INFLUXDB_SINK_MEASUREMENT); + String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT); QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + //assert data count Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); - for (QueryResult.Result result : sourceQueryResult.getResults()) { - List serieList = result.getSeries(); - if (CollectionUtils.isNotEmpty(serieList)) { - for (QueryResult.Series series : serieList) { - for (List values : series.getValues()) { - - } + //assert data values + List> sourceValues= sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + for(Object sourceVal : sourceValues.get(0)) { + for(Object sinkVal : sinkValues.get(0)) { + if (!Objects.deepEquals(sourceVal, sinkVal)) { + Assertions.assertEquals(sourceVal, sourceVal); } } } + } private void initializeInfluxDBClient() throws ConnectException { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf index 6ad0bbc2d3a..a469a846c36 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -18,13 +18,6 @@ env { execution.parallelism = 1 job.mode = "BATCH" - - #spark config - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local } source { @@ -35,7 +28,7 @@ source { upper_bound = 100 lower_bound = 1 partition_num = 4 - split_column = "f5" + split_column = "c_int" fields { label = STRING c_string = STRING @@ -45,6 +38,7 @@ source { c_int = INT c_smallint = SMALLINT c_boolean = BOOLEAN + time = BIGINT } } } @@ -57,6 +51,8 @@ sink { url = "http://influxdb-host:8086" database = "test" measurement = "sink" + key_time = "time" + key_tags = ["label"] batch_size = 1 } } \ No newline at end of file From d07c574a72ad4a4dbb3a55b3af7fe14bbac99344 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 22:10:54 +0800 Subject: [PATCH 06/16] fix code style --- .../seatunnel/e2e/connector/influxdb/InfluxdbIT.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 0b7a59dbb0f..e6cad0e6499 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -28,7 +28,6 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; @@ -43,14 +42,11 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.org.apache.commons.io.IOUtils; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.net.ConnectException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Date; @@ -184,17 +180,15 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte //assert data count Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); //assert data values - List> sourceValues= sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sourceValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); List> sinkValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); - for(Object sourceVal : sourceValues.get(0)) { - for(Object sinkVal : sinkValues.get(0)) { + for (Object sourceVal : sourceValues.get(0)) { + for (Object sinkVal : sinkValues.get(0)) { if (!Objects.deepEquals(sourceVal, sinkVal)) { Assertions.assertEquals(sourceVal, sourceVal); } } } - - } private void initializeInfluxDBClient() throws ConnectException { From abd9962ef3ae3177ae0889f42eb5320da24922ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 22:13:56 +0800 Subject: [PATCH 07/16] remove old e2e for influxdb --- .../connector-influxdb-flink-e2e/pom.xml | 43 ----- .../v2/influxdb/InfluxDBSourceToAssertIT.java | 115 ----------- .../influxdb/influxdb_source_to_assert.conf | 180 ----------------- .../src/test/resources/log4j.properties | 22 --- .../seatunnel-flink-connector-v2-e2e/pom.xml | 1 - .../connector-influxdb-spark-e2e/pom.xml | 50 ----- .../v2/influxdb/InfluxDBSourceToAssertIT.java | 115 ----------- .../influxdb/influxdb_source_to_assert.conf | 182 ------------------ .../src/test/resources/log4j.properties | 22 --- .../seatunnel-spark-connector-v2-e2e/pom.xml | 1 - 10 files changed, 731 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf delete mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf delete mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml deleted file mode 100644 index 83353aa6c33..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-flink-connector-v2-e2e - ${revision} - - 4.0.0 - - connector-influxdb-flink-e2e - - - - org.apache.seatunnel - connector-flink-e2e-base - ${project.version} - tests - test-jar - test - - - org.apache.seatunnel - connector-influxdb - ${project.version} - test - - - diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java deleted file mode 100644 index 336d1743d7b..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/influxdb/InfluxDBSourceToAssertIT.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.flink.v2.influxdb; - -import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; -import org.apache.seatunnel.e2e.flink.FlinkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; - -import java.io.IOException; -import java.net.ConnectException; -import java.sql.SQLException; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class InfluxDBSourceToAssertIT extends FlinkContainer { - - private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8"; - private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host"; - private static final String INFLUXDB_HOST = "localhost"; - - private static final int INFLUXDB_PORT = 8764; - private static final int INFLUXDB_CONTAINER_PORT = 8086; - private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT); - private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; - - private GenericContainer influxDBServer; - - private InfluxDB influxDB; - - @BeforeEach - public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException { - influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(INFLUXDB_CONTAINER_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - influxDBServer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT))); - Startables.deepStart(Stream.of(influxDBServer)).join(); - log.info("influxdb container started"); - initializeInfluxDBClient(); - batchInsertData(); - } - - @Test - public void testInfluxDBSource() throws IOException, InterruptedException, SQLException { - Container.ExecResult execResult = executeSeaTunnelFlinkJob("/influxdb/influxdb_source_to_assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - } - - private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL); - influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); - } - - public void batchInsertData() { - influxDB.createDatabase(INFLUXDB_DATABASE); - BatchPoints batchPoints = BatchPoints - .database(INFLUXDB_DATABASE) - .build(); - for (int i = 0; i < 100; i++) { - Point point = Point.measurement(INFLUXDB_MEASUREMENT) - .time(new Date().getTime(), TimeUnit.NANOSECONDS) - .tag("label", String.format("label_%s", i)) - .addField("f1", String.format("f1_%s", i)) - .addField("f2", Double.valueOf(i + 1)) - .addField("f3", Long.valueOf(i + 2)) - .addField("f4", Float.valueOf(i + 3)) - .addField("f5", Integer.valueOf(i)) - .addField("f6", (short) (i + 4)) - .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE) - .build(); - batchPoints.point(point); - } - influxDB.write(batchPoints); - } - - @AfterEach - public void closeInfluxDBContainer() { - if (influxDBServer != null) { - influxDBServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf deleted file mode 100644 index eb2a1e6c53b..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf +++ /dev/null @@ -1,180 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - # You can set flink configuration here - execution.parallelism = 1 - job.mode = "BATCH" - #execution.checkpoint.interval = 10000 - #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" - database = "test" - upper_bound = 100 - lower_bound = 1 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } - } -} - -transform { - - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql -} - -sink { - Assert { - rules = - { - field_rules = [{ - field_name = f1 - field_type = string - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN_LENGTH - rule_value = 4 - }, - { - rule_type = MAX_LENGTH - rule_value = 5 - } - ] - },{ - field_name = f2 - field_type = double - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 100 - } - ] - },{ - field_name = f3 - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 2 - }, - { - rule_type = MAX - rule_value = 101 - } - ] - },{ - field_name = f4 - field_type = float - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 3 - }, - { - rule_type = MAX - rule_value = 102 - } - ] - },{ - field_name = f5 - field_type = int - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - },{ - field_name = f6 - field_type = short - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 4 - }, - { - rule_type = MAX - rule_value = 103 - } - ] - },{ - field_name = f7 - field_type = boolean - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 1 - } - ] - } - ] - } - } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/category/sink-v2 -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties deleted file mode 100644 index db5d9e51220..00000000000 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-influxdb-flink-e2e/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 95d24c889e9..b492ce56c10 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -36,7 +36,6 @@ connector-fake-flink-e2e connector-mongodb-flink-e2e connector-iceberg-flink-e2e - connector-influxdb-flink-e2e diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml deleted file mode 100644 index 11fd1b93194..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ - - - - - org.apache.seatunnel - seatunnel-spark-connector-v2-e2e - ${revision} - - 4.0.0 - - connector-influxdb-spark-e2e - - - - org.apache.seatunnel - connector-spark-e2e-base - ${project.version} - tests - test-jar - test - - - - org.apache.seatunnel - connector-influxdb - ${project.version} - test - - - org.apache.seatunnel - connector-assert - ${project.version} - test - - - diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java deleted file mode 100644 index 6283f3e82f5..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/influxdb/InfluxDBSourceToAssertIT.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.spark.v2.influxdb; - -import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; -import org.apache.seatunnel.e2e.spark.SparkContainer; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; - -import java.io.IOException; -import java.net.ConnectException; -import java.sql.SQLException; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class InfluxDBSourceToAssertIT extends SparkContainer { - - private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8"; - private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host"; - private static final String INFLUXDB_HOST = "localhost"; - - private static final int INFLUXDB_PORT = 8764; - private static final int INFLUXDB_CONTAINER_PORT = 8086; - private static final String INFLUXDB_CONNECT_URL = String.format("http://%s:%s", INFLUXDB_HOST, INFLUXDB_PORT); - private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; - - private GenericContainer influxDBServer; - - private InfluxDB influxDB; - - @BeforeEach - public void startInfluxDBContainer() throws ClassNotFoundException, SQLException, ConnectException { - influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(INFLUXDB_CONTAINER_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - influxDBServer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", INFLUXDB_PORT, INFLUXDB_CONTAINER_PORT))); - Startables.deepStart(Stream.of(influxDBServer)).join(); - log.info("influxdb container started"); - initializeInfluxDBClient(); - batchInsertData(); - } - - @Test - public void testInfluxDBSource() throws IOException, InterruptedException, SQLException { - Container.ExecResult execResult = executeSeaTunnelSparkJob("/influxdb/influxdb_source_to_assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - } - - private void initializeInfluxDBClient() throws SQLException, ClassNotFoundException, ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(INFLUXDB_CONNECT_URL); - influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); - } - - public void batchInsertData() { - influxDB.createDatabase(INFLUXDB_DATABASE); - BatchPoints batchPoints = BatchPoints - .database(INFLUXDB_DATABASE) - .build(); - for (int i = 0; i < 100; i++) { - Point point = Point.measurement(INFLUXDB_MEASUREMENT) - .time(new Date().getTime(), TimeUnit.NANOSECONDS) - .tag("label", String.format("label_%s", i)) - .addField("f1", String.format("f1_%s", i)) - .addField("f2", Double.valueOf(i + 1)) - .addField("f3", Long.valueOf(i + 2)) - .addField("f4", Float.valueOf(i + 3)) - .addField("f5", Integer.valueOf(i)) - .addField("f6", (short) (i + 4)) - .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE) - .build(); - batchPoints.point(point); - } - influxDB.write(batchPoints); - } - - @AfterEach - public void closeInfluxDBContainer() { - if (influxDBServer != null) { - influxDBServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf deleted file mode 100644 index f8901980d35..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/influxdb/influxdb_source_to_assert.conf +++ /dev/null @@ -1,182 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local - job.mode = "BATCH" -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" - database = "test" - upper_bound = 100 - lower_bound = 1 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } - } -} - -transform { - - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql -} - -sink { - Assert { - rules = - { - field_rules = [{ - field_name = f1 - field_type = string - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN_LENGTH - rule_value = 4 - }, - { - rule_type = MAX_LENGTH - rule_value = 5 - } - ] - },{ - field_name = f2 - field_type = double - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 100 - } - ] - },{ - field_name = f3 - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 2 - }, - { - rule_type = MAX - rule_value = 101 - } - ] - },{ - field_name = f4 - field_type = float - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 3 - }, - { - rule_type = MAX - rule_value = 102 - } - ] - },{ - field_name = f5 - field_type = int - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - },{ - field_name = f6 - field_type = short - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 4 - }, - { - rule_type = MAX - rule_value = 103 - } - ] - },{ - field_name = f7 - field_type = boolean - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 1 - } - ] - } - ] - } - } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/category/sink-v2 -} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties deleted file mode 100644 index db5d9e51220..00000000000 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-influxdb-spark-e2e/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index 64fc9205633..08bfa545fc8 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -34,7 +34,6 @@ connector-iotdb-spark-e2e connector-jdbc-spark-e2e connector-mongodb-spark-e2e - connector-influxdb-spark-e2e From e8f06fc833d3125c723c3f5e6e84befe0161a89f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 24 Oct 2022 22:59:19 +0800 Subject: [PATCH 08/16] fix e2e and License header --- .../connector-influxdb-e2e/pom.xml | 20 +++++++++++++++++-- .../e2e/connector/influxdb/InfluxdbIT.java | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml index c5c3d3ddae3..652782b0631 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml @@ -1,5 +1,21 @@ - + + + org.apache.seatunnel seatunnel-connector-v2-e2e diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index e6cad0e6499..8c0ad48d995 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -181,7 +181,7 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); //assert data values List> sourceValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); - List> sinkValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); for (Object sourceVal : sourceValues.get(0)) { for (Object sinkVal : sinkValues.get(0)) { if (!Objects.deepEquals(sourceVal, sinkVal)) { From 19b7a21bc757789f823a651b05d231fc7f9ec074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Tue, 1 Nov 2022 10:11:43 +0800 Subject: [PATCH 09/16] add Changelog --- docs/en/connector-v2/sink/InfluxDB.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md index d33fcff6fa4..cff9787f94f 100644 --- a/docs/en/connector-v2/sink/InfluxDB.md +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -95,4 +95,10 @@ sink { } } -``` \ No newline at end of file +``` + +## Changelog + +### next version + +- Add InfluxDB Sink Connector \ No newline at end of file From c1edea48de22f4ce9fc839966829cf92add22280 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Fri, 4 Nov 2022 20:47:38 +0800 Subject: [PATCH 10/16] delete useless log4j file --- .../e2e/connector/influxdb/InfluxdbIT.java | 2 +- .../src/test/resources/log4j.properties | 22 ------------------- 2 files changed, 1 insertion(+), 23 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 8c0ad48d995..0311a5fbf78 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -185,7 +185,7 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte for (Object sourceVal : sourceValues.get(0)) { for (Object sinkVal : sinkValues.get(0)) { if (!Objects.deepEquals(sourceVal, sinkVal)) { - Assertions.assertEquals(sourceVal, sourceVal); + Assertions.assertEquals(sourceVal, sinkVal); } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties deleted file mode 100644 index db5d9e51220..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# Set everything to be logged to the console -log4j.rootCategory=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n From 63e67f16d60cc220fcdb1e05a2970ad2e52ddd16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sat, 5 Nov 2022 10:11:19 +0800 Subject: [PATCH 11/16] fix e2e --- .../e2e/connector/influxdb/InfluxdbIT.java | 15 +++++++++++---- .../src/test/resources/influxdb-to-influxdb.conf | 4 ++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index 0311a5fbf78..20cc6dce012 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -182,12 +182,19 @@ public void testInfluxdb(TestContainer container) throws IOException, Interrupte //assert data values List> sourceValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); List> sinkValues = sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); - for (Object sourceVal : sourceValues.get(0)) { - for (Object sinkVal : sinkValues.get(0)) { - if (!Objects.deepEquals(sourceVal, sinkVal)) { - Assertions.assertEquals(sourceVal, sinkVal); + int rowSize = sourceValues.size(); + int colSize = sourceValues.get(0).size(); + + for (int row = 0; row < rowSize; row++) { + for (int col = 0; col < colSize; col++) { + Object sourceColValue = sourceValues.get(row).get(col); + Object sinkColValue = sinkValues.get(row).get(col); + + if (!Objects.deepEquals(sourceColValue, sinkColValue)) { + Assertions.assertEquals(sourceColValue, sinkColValue); } } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf index a469a846c36..f95af29a2c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -25,8 +25,8 @@ source { url = "http://influxdb-host:8086" sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source" database = "test" - upper_bound = 100 - lower_bound = 1 + upper_bound = 99 + lower_bound = 0 partition_num = 4 split_column = "c_int" fields { From 797daf114be776a6f48a6f3c160aaf4d5ddffd44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sat, 5 Nov 2022 10:22:39 +0800 Subject: [PATCH 12/16] merge dev --- plugin-mapping.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 85a57ecbde6..1881ce7865c 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -137,6 +137,6 @@ seatunnel.source.Iceberg = connector-iceberg seatunnel.source.InfluxDB = connector-influxdb seatunnel.source.S3File = connector-file-s3 seatunnel.sink.S3File = connector-file-s3 -seatunnel.sink.InfluxDB = connector-influxdb seatunnel.source.Amazondynamodb = connector-amazondynamodb seatunnel.sink.Amazondynamodb = connector-amazondynamodb +seatunnel.sink.InfluxDB = connector-influxdb From 4bd1da2692ab558c80f5c9be5dcb724ef1e3c9c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Sun, 6 Nov 2022 19:05:03 +0800 Subject: [PATCH 13/16] remove old e2e --- .../influxdb/InfluxDBSourceToAssertIT.java | 122 ------------ .../resources/influxdb_source_to_assert.conf | 188 ------------------ 2 files changed, 310 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java deleted file mode 100644 index d39aa4f395e..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.connector.influxdb; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.TestContainer; - -import lombok.extern.slf4j.Slf4j; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class InfluxDBSourceToAssertIT extends TestSuiteBase implements TestResource { - - private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8"; - private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host"; - private static final int INFLUXDB_CONTAINER_PORT = 8086; - private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; - - private GenericContainer influxDBServer; - private InfluxDB influxDB; - - @BeforeAll - @Override - public void startUp() { - influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(INFLUXDB_CONTAINER_HOST) - .withExposedPorts(INFLUXDB_CONTAINER_PORT) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(INFLUXDB_DOCKER_IMAGE))); - Startables.deepStart(Stream.of(influxDBServer)).join(); - log.info("influxdb container started"); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> initializeInfluxDBClient()); - batchInsertData(); - } - - @TestTemplate - public void testInfluxDBSource(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/influxdb_source_to_assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - } - - private void initializeInfluxDBClient() throws ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(String.format("http://%s:%s", influxDBServer.getHost(), influxDBServer.getFirstMappedPort())); - influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); - } - - public void batchInsertData() { - influxDB.createDatabase(INFLUXDB_DATABASE); - BatchPoints batchPoints = BatchPoints - .database(INFLUXDB_DATABASE) - .build(); - for (int i = 0; i < 100; i++) { - Point point = Point.measurement(INFLUXDB_MEASUREMENT) - .time(new Date().getTime(), TimeUnit.NANOSECONDS) - .tag("label", String.format("label_%s", i)) - .addField("f1", String.format("f1_%s", i)) - .addField("f2", Double.valueOf(i + 1)) - .addField("f3", Long.valueOf(i + 2)) - .addField("f4", Float.valueOf(i + 3)) - .addField("f5", Integer.valueOf(i)) - .addField("f6", (short) (i + 4)) - .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE) - .build(); - batchPoints.point(point); - } - influxDB.write(batchPoints); - } - - @AfterAll - @Override - public void tearDown() { - if (influxDB != null) { - influxDB.close(); - } - if (influxDBServer != null) { - influxDBServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf deleted file mode 100644 index ea0e6e17740..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - execution.parallelism = 1 - job.mode = "BATCH" - - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" - database = "test" - upper_bound = 99 - lower_bound = 0 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } - } -} - -sink { - Assert { - rules = - { - row_rules = [ - { - rule_type = MAX_ROW - rule_value = 100 - }, - { - rule_type = MIN_ROW - rule_value = 100 - } - ], - field_rules = [{ - field_name = f1 - field_type = string - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN_LENGTH - rule_value = 4 - }, - { - rule_type = MAX_LENGTH - rule_value = 5 - } - ] - },{ - field_name = f2 - field_type = double - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 100 - } - ] - },{ - field_name = f3 - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 2 - }, - { - rule_type = MAX - rule_value = 101 - } - ] - },{ - field_name = f4 - field_type = float - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 3 - }, - { - rule_type = MAX - rule_value = 102 - } - ] - },{ - field_name = f5 - field_type = int - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - },{ - field_name = f6 - field_type = short - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 4 - }, - { - rule_type = MAX - rule_value = 103 - } - ] - },{ - field_name = f7 - field_type = boolean - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 1 - } - ] - } - ] - } - } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/category/sink-v2 -} \ No newline at end of file From 8d5c5060ed3cae11126a3a672a5b08bc4e8bbb4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Mon, 7 Nov 2022 22:13:44 +0800 Subject: [PATCH 14/16] mv scheduler to constructor of InfluxDBSinkWriter --- .../influxdb/sink/InfluxDBSinkWriter.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index a7e41a6d937..da89db304ad 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -66,6 +66,22 @@ public InfluxDBSinkWriter(Config pluginConfig, seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); this.batchList = new ArrayList<>(); + if (batchIntervalMs != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } + connect(); } @@ -104,21 +120,6 @@ private void tryInit() throws IOException { } initialize = true; connect(); - if (batchIntervalMs != null) { - scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); - scheduledFuture = scheduler.scheduleAtFixedRate( - () -> { - try { - flush(); - } catch (IOException e) { - flushException = e; - } - }, - batchIntervalMs, - batchIntervalMs, - TimeUnit.MILLISECONDS); - } } public synchronized void write(Point record) throws IOException { From 04897fae14e71c78eb96d5cce038c77e2ebcbd6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Tue, 8 Nov 2022 12:53:58 +0800 Subject: [PATCH 15/16] mv scheduler to constructor of InfluxDBSinkWriter --- .../seatunnel/influxdb/sink/InfluxDBSinkWriter.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index da89db304ad..94836da52d0 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -51,10 +51,8 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter { private InfluxDB influxDB; private SinkConfig sinkConfig; private final List batchList; - private ScheduledExecutorService scheduler; private ScheduledFuture scheduledFuture; - private volatile boolean initialize; private volatile Exception flushException; private final Integer batchIntervalMs; @@ -114,16 +112,7 @@ public void close() throws IOException { } } - private void tryInit() throws IOException { - if (initialize) { - return; - } - initialize = true; - connect(); - } - public synchronized void write(Point record) throws IOException { - tryInit(); checkFlushException(); batchList.add(record); From 42c9e0878f83ca17f5e57d359c923409e1e60b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AF=95=E5=8D=9A?= Date: Tue, 8 Nov 2022 15:50:32 +0800 Subject: [PATCH 16/16] remove InfluxDBSinkWriter useless synchronized --- .../seatunnel/influxdb/sink/InfluxDBSinkWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index 94836da52d0..809a3eaaa88 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -112,7 +112,7 @@ public void close() throws IOException { } } - public synchronized void write(Point record) throws IOException { + public void write(Point record) throws IOException { checkFlushException(); batchList.add(record); @@ -122,7 +122,7 @@ public synchronized void write(Point record) throws IOException { } } - public synchronized void flush() throws IOException { + public void flush() throws IOException { checkFlushException(); if (batchList.isEmpty()) { return;