From ebf41786b1caaa03699c4b5a099289719e47ecdc Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Mon, 9 Jan 2023 15:58:49 +0800 Subject: [PATCH 1/5] [Feature][Connector-V2][File] Support compress --- .../file/config/BaseFileSinkConfig.java | 17 ++----- .../seatunnel/file/config/BaseSinkConfig.java | 6 +-- .../seatunnel/file/config/CompressConfig.java | 22 --------- .../seatunnel/file/config/CompressFormat.java | 40 ++++++++++++----- .../sink/writer/AbstractWriteStrategy.java | 6 +-- .../file/sink/writer/JsonWriteStrategy.java | 17 ++++++- .../file/sink/writer/OrcWriteStrategy.java | 4 +- .../sink/writer/ParquetWriteStrategy.java | 5 +-- .../file/sink/writer/TextWriteStrategy.java | 45 +++++++++---------- 9 files changed, 78 insertions(+), 84 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 42b31adc9ee..7b748086201 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -19,11 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -35,9 +33,9 @@ import java.util.Locale; @Data -public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Serializable { +public class BaseFileSinkConfig implements DelimiterConfig, Serializable { private static final long serialVersionUID = 1L; - protected String compressCodec; + protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue(); protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue(); protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue(); protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue(); @@ -50,15 +48,8 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { - CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT)); - switch (compressFormat) { - case LZO: - this.compressCodec = compressFormat.getCompressCodec(); - break; - default: - throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, - "Compress not supported this compress code by SeaTunnel file connector now"); - } + String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key()); + this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); } if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) { this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key()); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 5abffc13f43..ae264ffb495 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -37,9 +37,9 @@ public class BaseSinkConfig { public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}"; public static final int DEFAULT_BATCH_SIZE = 1000000; - public static final Option COMPRESS_CODEC = Options.key("compress_codec") - .stringType() - .noDefaultValue() + public static final Option COMPRESS_CODEC = Options.key("compress_codec") + .enumType(CompressFormat.class) + .defaultValue(CompressFormat.NONE) .withDescription("Compression codec"); public static final Option DATE_FORMAT = Options.key("date_format") diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java deleted file mode 100644 index 48d47c8d1df..00000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.config; - -public interface CompressConfig { - String getCompressCodec(); -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java index 6449f184571..6483ae9c84a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java @@ -17,29 +17,49 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; +import org.apache.orc.CompressionKind; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + import java.io.Serializable; public enum CompressFormat implements Serializable { + // text json orc parquet support + LZO(".lzo", CompressionKind.LZO, CompressionCodecName.LZO), + + // orc and parquet support + NONE("", CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED), + SNAPPY(".snappy", CompressionKind.SNAPPY, CompressionCodecName.SNAPPY), + LZ4(".lz4", CompressionKind.LZ4, CompressionCodecName.LZ4), + + // only orc support + ZLIB(".zlib", CompressionKind.ZLIB, CompressionCodecName.UNCOMPRESSED), - LZO("lzo"), - NONE("none"); + // only parquet support + GZIP(".gz", CompressionKind.NONE, CompressionCodecName.GZIP), + BROTLI(".br", CompressionKind.NONE, CompressionCodecName.BROTLI), + ZSTD(".zstd", CompressionKind.NONE, CompressionCodecName.ZSTD); private final String compressCodec; + private final CompressionKind orcCompression; + private final CompressionCodecName parquetCompression; - CompressFormat(String compressCodec) { + CompressFormat(String compressCodec, + CompressionKind orcCompression, + CompressionCodecName parentCompression) { this.compressCodec = compressCodec; + this.orcCompression = orcCompression; + this.parquetCompression = parentCompression; } public String getCompressCodec() { return compressCodec; } - public static CompressFormat getCompressFormat(String value) { - switch (value) { - case "lzo": - return CompressFormat.LZO; - default: - return CompressFormat.NONE; - } + public CompressionKind getOrcCompression() { + return orcCompression; + } + + public CompressionCodecName getParquetCompression() { + return parquetCompression; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 41bd8ab0831..329af511aee 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -63,6 +63,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected final Logger log = LoggerFactory.getLogger(this.getClass()); protected final FileSinkConfig fileSinkConfig; + protected final CompressFormat compressFormat; protected final List sinkColumnsIndexInRow; protected String jobId; protected int subTaskIndex; @@ -90,6 +91,7 @@ public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { this.fileSinkConfig = fileSinkConfig; this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow(); this.batchSize = fileSinkConfig.getBatchSize(); + this.compressFormat = fileSinkConfig.getCompressFormat(); } /** @@ -222,9 +224,7 @@ public String generateFileName(String transactionId) { String fileNameExpression = fileSinkConfig.getFileNameExpression(); FileFormat fileFormat = fileSinkConfig.getFileFormat(); String suffix = fileFormat.getSuffix(); - if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) { - suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix; - } + suffix = compressFormat.getCompressCodec() + suffix; if (StringUtils.isBlank(fileNameExpression)) { return transactionId + suffix; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index 9ae10fd35c2..7d9b7ad0dfb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -25,10 +25,12 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.json.JsonSerializationSchema; +import io.airlift.compress.lzo.LzopCodec; import lombok.NonNull; import org.apache.hadoop.fs.FSDataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.HashMap; import java.util.Map; @@ -93,7 +95,20 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath); if (fsDataOutputStream == null) { try { - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + fsDataOutputStream = new FSDataOutputStream(out, null); + break; + case NONE: + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + default: + log.warn("Json file does not support this compress type: {}", compressFormat.getCompressCodec()); + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + } beingWrittenOutputStream.put(filePath, fsDataOutputStream); isFirstWrite.put(filePath, true); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index af5b774e9f8..507dafdcc26 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -30,7 +30,6 @@ import lombok.NonNull; import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; @@ -113,8 +112,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) { try { OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf)) .setSchema(schema) - // temporarily used snappy - .compress(CompressionKind.SNAPPY) + .compress(compressFormat.getOrcCompression()) // use orc version 0.12 .version(OrcFile.Version.V_0_12) .overwrite(true); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java index 6c431605a06..ad05c123ed2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java @@ -42,7 +42,6 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -141,9 +140,7 @@ private ParquetWriter getOrCreateWriter(@NonNull String filePath) .withDataModel(dataModel) // use parquet v1 to improve compatibility .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - // Temporarily use snappy compress - // I think we can use the compress option in config to control this - .withCompressionCodec(CompressionCodecName.SNAPPY) + .withCompressionCodec(compressFormat.getParquetCompression()) .withSchema(schema) .build(); this.beingWrittenWriter.put(filePath, newWriter); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index b0ba6674631..6698e2ee289 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,7 +24,6 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; -import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -36,7 +35,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; -import java.util.Locale; import java.util.Map; public class TextWriteStrategy extends AbstractWriteStrategy { @@ -48,18 +46,16 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; private SerializationSchema serializationSchema; - private String compressCodec; - public TextWriteStrategy(FileSinkConfig textFileSinkConfig) { - super(textFileSinkConfig); + public TextWriteStrategy(FileSinkConfig fileSinkConfig) { + super(fileSinkConfig); this.beingWrittenOutputStream = new HashMap<>(); this.isFirstWrite = new HashMap<>(); - this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter(); - this.rowDelimiter = textFileSinkConfig.getRowDelimiter(); - this.dateFormat = textFileSinkConfig.getDateFormat(); - this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat(); - this.timeFormat = textFileSinkConfig.getTimeFormat(); - this.compressCodec = textFileSinkConfig.getCompressCodec(); + this.fieldDelimiter = fileSinkConfig.getFieldDelimiter(); + this.rowDelimiter = fileSinkConfig.getRowDelimiter(); + this.dateFormat = fileSinkConfig.getDateFormat(); + this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); + this.timeFormat = fileSinkConfig.getTimeFormat(); } @Override @@ -117,21 +113,20 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath); if (fsDataOutputStream == null) { try { - if (compressCodec != null) { - CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT)); - switch (compressFormat) { - case LZO: - LzopCodec lzo = new LzopCodec(); - OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); - fsDataOutputStream = new FSDataOutputStream(out, null); - break; - default: - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - } - } else { - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + fsDataOutputStream = new FSDataOutputStream(out, null); + break; + case NONE: + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + default: + log.warn("Text file does not support this compress type: {}", compressFormat.getCompressCodec()); + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; } - beingWrittenOutputStream.put(filePath, fsDataOutputStream); isFirstWrite.put(filePath, true); } catch (IOException e) { From 1eaa685f7273f15447753b9e34f43de21ac11022 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sat, 14 Jan 2023 23:10:33 +0800 Subject: [PATCH 2/5] [Feature][Connector-V2][File] Update e2e tests --- .../src/test/resources/orc/fake_to_local_file_orc.conf | 1 + .../src/test/resources/parquet/fake_to_local_file_parquet.conf | 1 + .../src/test/resources/text/fake_to_local_file_text.conf | 1 + 3 files changed, 3 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf index cbbc227b309..a134e9fc064 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf @@ -75,5 +75,6 @@ sink { file_format = "orc" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "zlib" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf index 454e27f41ef..c3eae33b0fd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf @@ -75,5 +75,6 @@ sink { file_format = "parquet" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "gzip" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf index 54227c8ee64..795d82234ca 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf @@ -75,5 +75,6 @@ sink { file_format = "text" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "lzo" } } \ No newline at end of file From 78063fb8f35a403a28f00d050da201e3db2f8bee Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sun, 15 Jan 2023 00:05:44 +0800 Subject: [PATCH 3/5] [Feature][Connector-V2][File] Update docs --- docs/en/connector-v2/sink/FtpFile.md | 11 +++++ docs/en/connector-v2/sink/HdfsFile.md | 14 ++++-- docs/en/connector-v2/sink/LocalFile.md | 11 +++++ docs/en/connector-v2/sink/OssFile.md | 11 +++++ docs/en/connector-v2/sink/OssJindoFile.md | 19 +++++++- docs/en/connector-v2/sink/S3File.md | 55 ++++++++++++++--------- docs/en/connector-v2/sink/SftpFile.md | 11 +++++ 7 files changed, 105 insertions(+), 27 deletions(-) diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 5d7c0099672..f8ac3578a9d 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### host [string] @@ -157,6 +158,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index 3790fe01b11..aa439fb7609 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | kerberos_principal | string | no | - | | kerberos_keytab_path | string | no | - | | | compress_codec | string | no | none | | @@ -153,6 +154,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` +- ### kerberos_principal [string] The principal of kerberos @@ -161,9 +172,6 @@ The principal of kerberos The keytab path of kerberos -### compressCodec [string] -Support lzo compression for text in file format. The file name ends with ".lzo.txt" . - ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index ab5753f275f..421ba2328c3 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -45,6 +45,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -137,6 +138,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index a45983ea775..6c2c3e049cb 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -160,6 +161,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index 92fb7131db2..4afe55e30ec 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -156,6 +157,20 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran Only support `true` now. +### batch_size [int] + +The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. + +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -166,7 +181,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum ```hocon - OssFile { + OssJindoFile { path="/seatunnel/sink" bucket = "oss://tyrantlucifer-image-bed" access_key = "xxxxxxxxxxx" @@ -192,7 +207,7 @@ For parquet file format with `sink_columns` ```hocon - OssFile { + OssJindoFile { path = "/seatunnel/sink" bucket = "oss://tyrantlucifer-image-bed" access_key = "xxxxxxxxxxx" diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index a9040e9bc5d..724f36d07a9 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -31,28 +31,29 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|-----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| bucket | string | yes | - | | -| fs.s3a.endpoint | string | yes | - | | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | -| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| common-options | object | no | - | | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| bucket | string | yes | - | | +| fs.s3a.endpoint | string | yes | - | | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | +| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | ### path [string] @@ -174,6 +175,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index d71ea8b7d47..db0d526362e 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### host [string] @@ -157,6 +158,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. From 29f9bfca462195a10084918d991f185901b21335 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Mon, 16 Jan 2023 20:58:33 +0800 Subject: [PATCH 4/5] [Improve][Connector-V2][File] Update docs --- docs/en/connector-v2/sink/FtpFile.md | 4 +++- docs/en/connector-v2/sink/HdfsFile.md | 3 ++- docs/en/connector-v2/sink/LocalFile.md | 2 ++ docs/en/connector-v2/sink/OssFile.md | 4 +++- docs/en/connector-v2/sink/OssJindoFile.md | 6 +++++- docs/en/connector-v2/sink/S3File.md | 8 ++++++-- docs/en/connector-v2/sink/SftpFile.md | 8 ++++++-- 7 files changed, 27 insertions(+), 8 deletions(-) diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index f8ac3578a9d..5380c69672f 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -235,4 +235,6 @@ FtpFile { - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index aa439fb7609..dfb115ec4d5 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -253,4 +253,5 @@ HdfsFile { ### Next version - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) - [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782)) -- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) \ No newline at end of file +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 421ba2328c3..9d9f4d2ac0f 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -217,3 +217,5 @@ LocalFile { - When restore writer from states getting transaction directly failed - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index 6c2c3e049cb..5cef80d1656 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -255,4 +255,6 @@ For orc file format simple config - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index 4afe55e30ec..4c0ccb9a024 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -236,6 +236,10 @@ For orc file format simple config ## Changelog +### 2.3.0 2022-12-30 + +- Add OSS Jindo File Sink Connector + ### Next version -- Add OSS Jindo File Sink Connector \ No newline at end of file +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 724f36d07a9..76ce51342ae 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -265,7 +265,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden - Add S3File Sink Connector -### Next version +### 2.3.0 2022-12-30 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) - When field from upstream is null it will throw NullPointerException - Sink columns mapping failed @@ -275,4 +275,8 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden - Allow the use of the s3a protocol - Decouple hadoop-aws dependencies - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) -- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/)) \ No newline at end of file +- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/)) + +### Next version + +- ​ [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index db0d526362e..4879cde7b88 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -202,7 +202,7 @@ SftpFile { ## Changelog -### Next version +### 2.3.0 2022-12-30 - Add SftpFile Sink Connector - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) @@ -210,4 +210,8 @@ SftpFile { - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +### Next version + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file From 5bf9858f4321b2027ae454b8fa1c2055eef35765 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Mon, 16 Jan 2023 21:35:09 +0800 Subject: [PATCH 5/5] [Improve][Connector-V2][File] Update option rule --- .../seatunnel/file/config/BaseSinkConfig.java | 19 +++++++++++++++++++ .../file/ftp/sink/FtpFileSinkFactory.java | 6 +++++- .../file/hdfs/sink/HdfsFileSinkFactory.java | 6 +++++- .../file/local/sink/LocalFileSinkFactory.java | 6 +++++- .../file/oss/sink/OssFileSinkFactory.java | 6 +++++- .../file/oss/sink/OssFileSinkFactory.java | 6 +++++- .../file/s3/sink/S3FileSinkFactory.java | 7 ++++++- .../file/sftp/sink/SftpFileSinkFactory.java | 6 +++++- 8 files changed, 55 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index ae264ffb495..572a49fd37c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import java.util.Arrays; import java.util.List; public class BaseSinkConfig { @@ -42,6 +43,24 @@ public class BaseSinkConfig { .defaultValue(CompressFormat.NONE) .withDescription("Compression codec"); + public static final Option TXT_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO)) + .defaultValue(CompressFormat.NONE) + .withDescription("Txt file supported compression"); + + public static final Option PARQUET_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO, + CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.GZIP, + CompressFormat.BROTLI, CompressFormat.ZSTD)) + .defaultValue(CompressFormat.NONE) + .withDescription("Parquet file supported compression"); + + public static final Option ORC_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO, + CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.ZLIB)) + .defaultValue(CompressFormat.NONE) + .withDescription("Orc file supported compression"); + public static final Option DATE_FORMAT = Options.key("date_format") .enumType(DateUtils.Formatter.class) .defaultValue(DateUtils.Formatter.YYYY_MM_DD) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index d5a246b612a..35e611ced3f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(FtpConfig.FTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java index d7a27b5ccc2..68fc6e74927 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java @@ -41,7 +41,11 @@ public OptionRule optionRule() { .required(BaseSinkConfig.FILE_PATH) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index dfc14884a77..bf7390c168e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -39,7 +39,11 @@ public OptionRule optionRule() { .required(BaseSinkConfig.FILE_PATH) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 55b27ffa1fc..8495d95b9c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(OssConfig.ENDPOINT) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 55b27ffa1fc..8495d95b9c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(OssConfig.ENDPOINT) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index 01a3c55afd0..3a0a95b417d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -44,7 +44,12 @@ public OptionRule optionRule() { .conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY) .optional(S3Config.S3_PROPERTIES) .optional(BaseSinkConfig.FILE_FORMAT) - .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, BaseSinkConfig.FIELD_DELIMITER) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) .optional(BaseSinkConfig.HAVE_PARTITION) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java index f2be6d2d00d..bc61079fe9e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(SftpConfig.SFTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT)