From e26c3c14933dba142a35fd969fb32109df4f526b Mon Sep 17 00:00:00 2001 From: Tyrantlucifer Date: Mon, 16 Jan 2023 21:57:21 +0800 Subject: [PATCH] [Feature][Connector-V2][File] Support compress (#3899) * [Feature][Connector-V2][File] Support compress * [Feature][Connector-V2][File] Update e2e tests * [Feature][Connector-V2][File] Update docs * [Improve][Connector-V2][File] Update docs * [Improve][Connector-V2][File] Update option rule --- docs/en/connector-v2/sink/FtpFile.md | 15 ++++- docs/en/connector-v2/sink/HdfsFile.md | 17 +++-- docs/en/connector-v2/sink/LocalFile.md | 13 ++++ docs/en/connector-v2/sink/OssFile.md | 15 ++++- docs/en/connector-v2/sink/OssJindoFile.md | 25 +++++++- docs/en/connector-v2/sink/S3File.md | 63 ++++++++++++------- docs/en/connector-v2/sink/SftpFile.md | 19 +++++- .../file/config/BaseFileSinkConfig.java | 17 ++--- .../seatunnel/file/config/BaseSinkConfig.java | 25 +++++++- .../seatunnel/file/config/CompressConfig.java | 22 ------- .../seatunnel/file/config/CompressFormat.java | 40 +++++++++--- .../sink/writer/AbstractWriteStrategy.java | 6 +- .../file/sink/writer/JsonWriteStrategy.java | 17 ++++- .../file/sink/writer/OrcWriteStrategy.java | 4 +- .../sink/writer/ParquetWriteStrategy.java | 5 +- .../file/sink/writer/TextWriteStrategy.java | 45 ++++++------- .../file/ftp/sink/FtpFileSinkFactory.java | 6 +- .../file/hdfs/sink/HdfsFileSinkFactory.java | 6 +- .../file/local/sink/LocalFileSinkFactory.java | 6 +- .../file/oss/sink/OssFileSinkFactory.java | 6 +- .../file/oss/sink/OssFileSinkFactory.java | 6 +- .../file/s3/sink/S3FileSinkFactory.java | 7 ++- .../file/sftp/sink/SftpFileSinkFactory.java | 6 +- .../resources/orc/fake_to_local_file_orc.conf | 1 + .../parquet/fake_to_local_file_parquet.conf | 1 + .../text/fake_to_local_file_text.conf | 1 + 26 files changed, 268 insertions(+), 126 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index 840bd502d45..6180c366ca7 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### host [string] @@ -158,6 +159,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -225,4 +236,6 @@ FtpFile { - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index 104eb69383c..72fe02ee237 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -51,6 +51,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | kerberos_principal | string | no | - | | kerberos_keytab_path | string | no | - | | | compress_codec | string | no | none | | @@ -154,6 +155,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` +- ### kerberos_principal [string] The principal of kerberos @@ -162,9 +173,6 @@ The principal of kerberos The keytab path of kerberos -### compressCodec [string] -Support lzo compression for text in file format. The file name ends with ".lzo.txt" . - ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -246,4 +254,5 @@ HdfsFile { ### Next version - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) - [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782)) -- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) \ No newline at end of file +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 6bc4f6f08bd..3d9aa109768 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -46,6 +46,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -138,6 +139,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -207,3 +218,5 @@ LocalFile { - When restore writer from states getting transaction directly failed - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md index d0e7ac69271..b37905e5772 100644 --- a/docs/en/connector-v2/sink/OssFile.md +++ b/docs/en/connector-v2/sink/OssFile.md @@ -53,6 +53,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -161,6 +162,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -245,4 +256,6 @@ For orc file format simple config - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md index 8378d127dd3..ba7c14c45bc 100644 --- a/docs/en/connector-v2/sink/OssJindoFile.md +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -53,6 +53,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### path [string] @@ -157,6 +158,20 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran Only support `true` now. +### batch_size [int] + +The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. + +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -167,7 +182,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum ```hocon - OssFile { + OssJindoFile { path="/seatunnel/sink" bucket = "oss://tyrantlucifer-image-bed" access_key = "xxxxxxxxxxx" @@ -193,7 +208,7 @@ For parquet file format with `sink_columns` ```hocon - OssFile { + OssJindoFile { path = "/seatunnel/sink" bucket = "oss://tyrantlucifer-image-bed" access_key = "xxxxxxxxxxx" @@ -222,6 +237,10 @@ For orc file format simple config ## Changelog +### 2.3.0 2022-12-30 + +- Add OSS Jindo File Sink Connector + ### Next version -- Add OSS Jindo File Sink Connector \ No newline at end of file +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md index 39c8665393d..5fac487b582 100644 --- a/docs/en/connector-v2/sink/S3File.md +++ b/docs/en/connector-v2/sink/S3File.md @@ -32,28 +32,29 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|-----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| -| path | string | yes | - | | -| bucket | string | yes | - | | -| fs.s3a.endpoint | string | yes | - | | -| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | -| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format is text | -| row_delimiter | string | no | "\n" | Only used when file_format is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| common-options | object | no | - | | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| bucket | string | yes | - | | +| fs.s3a.endpoint | string | yes | - | | +| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | | +| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format is text | +| row_delimiter | string | no | "\n" | Only used when file_format is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | ### path [string] @@ -175,6 +176,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -255,7 +266,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden - Add S3File Sink Connector -### Next version +### 2.3.0 2022-12-30 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) - When field from upstream is null it will throw NullPointerException - Sink columns mapping failed @@ -265,4 +276,8 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden - Allow the use of the s3a protocol - Decouple hadoop-aws dependencies - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) -- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/)) \ No newline at end of file +- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/)) + +### Next version + +- ​ [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md index 32dc881c91e..dafeb6f2ebb 100644 --- a/docs/en/connector-v2/sink/SftpFile.md +++ b/docs/en/connector-v2/sink/SftpFile.md @@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | | common-options | object | no | - | | ### host [string] @@ -158,6 +159,16 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc: `lzo` `snappy` `lz4` `zlib` `none` +- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none` + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. @@ -192,7 +203,7 @@ SftpFile { ## Changelog -### Next version +### 2.3.0 2022-12-30 - Add SftpFile Sink Connector - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) @@ -200,4 +211,8 @@ SftpFile { - Sink columns mapping failed - When restore writer from states getting transaction directly failed -- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) \ No newline at end of file +- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) + +### Next version + +- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899)) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 57e8079cb05..c38f43ae97e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -19,11 +19,9 @@ import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -35,9 +33,9 @@ import java.util.Locale; @Data -public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Serializable { +public class BaseFileSinkConfig implements DelimiterConfig, Serializable { private static final long serialVersionUID = 1L; - protected String compressCodec; + protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue(); protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue(); protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue(); protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue(); @@ -50,15 +48,8 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { - CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT)); - switch (compressFormat) { - case LZO: - this.compressCodec = compressFormat.getCompressCodec(); - break; - default: - throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, - "Compress not supported this compress code by SeaTunnel file connector now"); - } + String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key()); + this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); } if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) { this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key()); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 5abffc13f43..572a49fd37c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import java.util.Arrays; import java.util.List; public class BaseSinkConfig { @@ -37,11 +38,29 @@ public class BaseSinkConfig { public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}"; public static final int DEFAULT_BATCH_SIZE = 1000000; - public static final Option COMPRESS_CODEC = Options.key("compress_codec") - .stringType() - .noDefaultValue() + public static final Option COMPRESS_CODEC = Options.key("compress_codec") + .enumType(CompressFormat.class) + .defaultValue(CompressFormat.NONE) .withDescription("Compression codec"); + public static final Option TXT_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO)) + .defaultValue(CompressFormat.NONE) + .withDescription("Txt file supported compression"); + + public static final Option PARQUET_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO, + CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.GZIP, + CompressFormat.BROTLI, CompressFormat.ZSTD)) + .defaultValue(CompressFormat.NONE) + .withDescription("Parquet file supported compression"); + + public static final Option ORC_COMPRESS = Options.key("compress_codec") + .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO, + CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.ZLIB)) + .defaultValue(CompressFormat.NONE) + .withDescription("Orc file supported compression"); + public static final Option DATE_FORMAT = Options.key("date_format") .enumType(DateUtils.Formatter.class) .defaultValue(DateUtils.Formatter.YYYY_MM_DD) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java deleted file mode 100644 index 48d47c8d1df..00000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.config; - -public interface CompressConfig { - String getCompressCodec(); -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java index 6449f184571..6483ae9c84a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java @@ -17,29 +17,49 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; +import org.apache.orc.CompressionKind; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + import java.io.Serializable; public enum CompressFormat implements Serializable { + // text json orc parquet support + LZO(".lzo", CompressionKind.LZO, CompressionCodecName.LZO), + + // orc and parquet support + NONE("", CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED), + SNAPPY(".snappy", CompressionKind.SNAPPY, CompressionCodecName.SNAPPY), + LZ4(".lz4", CompressionKind.LZ4, CompressionCodecName.LZ4), + + // only orc support + ZLIB(".zlib", CompressionKind.ZLIB, CompressionCodecName.UNCOMPRESSED), - LZO("lzo"), - NONE("none"); + // only parquet support + GZIP(".gz", CompressionKind.NONE, CompressionCodecName.GZIP), + BROTLI(".br", CompressionKind.NONE, CompressionCodecName.BROTLI), + ZSTD(".zstd", CompressionKind.NONE, CompressionCodecName.ZSTD); private final String compressCodec; + private final CompressionKind orcCompression; + private final CompressionCodecName parquetCompression; - CompressFormat(String compressCodec) { + CompressFormat(String compressCodec, + CompressionKind orcCompression, + CompressionCodecName parentCompression) { this.compressCodec = compressCodec; + this.orcCompression = orcCompression; + this.parquetCompression = parentCompression; } public String getCompressCodec() { return compressCodec; } - public static CompressFormat getCompressFormat(String value) { - switch (value) { - case "lzo": - return CompressFormat.LZO; - default: - return CompressFormat.NONE; - } + public CompressionKind getOrcCompression() { + return orcCompression; + } + + public CompressionCodecName getParquetCompression() { + return parquetCompression; } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 41bd8ab0831..329af511aee 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -63,6 +63,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected final Logger log = LoggerFactory.getLogger(this.getClass()); protected final FileSinkConfig fileSinkConfig; + protected final CompressFormat compressFormat; protected final List sinkColumnsIndexInRow; protected String jobId; protected int subTaskIndex; @@ -90,6 +91,7 @@ public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { this.fileSinkConfig = fileSinkConfig; this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow(); this.batchSize = fileSinkConfig.getBatchSize(); + this.compressFormat = fileSinkConfig.getCompressFormat(); } /** @@ -222,9 +224,7 @@ public String generateFileName(String transactionId) { String fileNameExpression = fileSinkConfig.getFileNameExpression(); FileFormat fileFormat = fileSinkConfig.getFileFormat(); String suffix = fileFormat.getSuffix(); - if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) { - suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix; - } + suffix = compressFormat.getCompressCodec() + suffix; if (StringUtils.isBlank(fileNameExpression)) { return transactionId + suffix; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java index 9ae10fd35c2..7d9b7ad0dfb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java @@ -25,10 +25,12 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.json.JsonSerializationSchema; +import io.airlift.compress.lzo.LzopCodec; import lombok.NonNull; import org.apache.hadoop.fs.FSDataOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.util.HashMap; import java.util.Map; @@ -93,7 +95,20 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath); if (fsDataOutputStream == null) { try { - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + fsDataOutputStream = new FSDataOutputStream(out, null); + break; + case NONE: + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + default: + log.warn("Json file does not support this compress type: {}", compressFormat.getCompressCodec()); + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + } beingWrittenOutputStream.put(filePath, fsDataOutputStream); isFirstWrite.put(filePath, true); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index af5b774e9f8..507dafdcc26 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -30,7 +30,6 @@ import lombok.NonNull; import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; @@ -113,8 +112,7 @@ private Writer getOrCreateWriter(@NonNull String filePath) { try { OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf)) .setSchema(schema) - // temporarily used snappy - .compress(CompressionKind.SNAPPY) + .compress(compressFormat.getOrcCompression()) // use orc version 0.12 .version(OrcFile.Version.V_0_12) .overwrite(true); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java index 6c431605a06..ad05c123ed2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java @@ -42,7 +42,6 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopOutputFile; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -141,9 +140,7 @@ private ParquetWriter getOrCreateWriter(@NonNull String filePath) .withDataModel(dataModel) // use parquet v1 to improve compatibility .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - // Temporarily use snappy compress - // I think we can use the compress option in config to control this - .withCompressionCodec(CompressionCodecName.SNAPPY) + .withCompressionCodec(compressFormat.getParquetCompression()) .withSchema(schema) .build(); this.beingWrittenWriter.put(filePath, newWriter); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index b0ba6674631..6698e2ee289 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,7 +24,6 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; -import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -36,7 +35,6 @@ import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; -import java.util.Locale; import java.util.Map; public class TextWriteStrategy extends AbstractWriteStrategy { @@ -48,18 +46,16 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; private SerializationSchema serializationSchema; - private String compressCodec; - public TextWriteStrategy(FileSinkConfig textFileSinkConfig) { - super(textFileSinkConfig); + public TextWriteStrategy(FileSinkConfig fileSinkConfig) { + super(fileSinkConfig); this.beingWrittenOutputStream = new HashMap<>(); this.isFirstWrite = new HashMap<>(); - this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter(); - this.rowDelimiter = textFileSinkConfig.getRowDelimiter(); - this.dateFormat = textFileSinkConfig.getDateFormat(); - this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat(); - this.timeFormat = textFileSinkConfig.getTimeFormat(); - this.compressCodec = textFileSinkConfig.getCompressCodec(); + this.fieldDelimiter = fileSinkConfig.getFieldDelimiter(); + this.rowDelimiter = fileSinkConfig.getRowDelimiter(); + this.dateFormat = fileSinkConfig.getDateFormat(); + this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); + this.timeFormat = fileSinkConfig.getTimeFormat(); } @Override @@ -117,21 +113,20 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath); if (fsDataOutputStream == null) { try { - if (compressCodec != null) { - CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT)); - switch (compressFormat) { - case LZO: - LzopCodec lzo = new LzopCodec(); - OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); - fsDataOutputStream = new FSDataOutputStream(out, null); - break; - default: - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); - } - } else { - fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); + fsDataOutputStream = new FSDataOutputStream(out, null); + break; + case NONE: + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; + default: + log.warn("Text file does not support this compress type: {}", compressFormat.getCompressCodec()); + fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + break; } - beingWrittenOutputStream.put(filePath, fsDataOutputStream); isFirstWrite.put(filePath, true); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index d5a246b612a..35e611ced3f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(FtpConfig.FTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java index d7a27b5ccc2..68fc6e74927 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java @@ -41,7 +41,11 @@ public OptionRule optionRule() { .required(BaseSinkConfig.FILE_PATH) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index dfc14884a77..bf7390c168e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -39,7 +39,11 @@ public OptionRule optionRule() { .required(BaseSinkConfig.FILE_PATH) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 55b27ffa1fc..8495d95b9c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(OssConfig.ENDPOINT) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 55b27ffa1fc..8495d95b9c6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(OssConfig.ENDPOINT) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index 01a3c55afd0..3a0a95b417d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -44,7 +44,12 @@ public OptionRule optionRule() { .conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY) .optional(S3Config.S3_PROPERTIES) .optional(BaseSinkConfig.FILE_FORMAT) - .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, BaseSinkConfig.FIELD_DELIMITER) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) .optional(BaseSinkConfig.HAVE_PARTITION) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java index f2be6d2d00d..bc61079fe9e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java @@ -44,7 +44,11 @@ public OptionRule optionRule() { .required(SftpConfig.SFTP_PASSWORD) .optional(BaseSinkConfig.FILE_FORMAT) .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, - BaseSinkConfig.FIELD_DELIMITER) + BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS) + .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS) .optional(BaseSinkConfig.CUSTOM_FILENAME) .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf index cbbc227b309..a134e9fc064 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf @@ -75,5 +75,6 @@ sink { file_format = "orc" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "zlib" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf index 454e27f41ef..c3eae33b0fd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf @@ -75,5 +75,6 @@ sink { file_format = "parquet" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "gzip" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf index 54227c8ee64..795d82234ca 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf @@ -75,5 +75,6 @@ sink { file_format = "text" filename_time_format = "yyyy.MM.dd" is_enable_transaction = true + compress_codec = "lzo" } } \ No newline at end of file