From ae692b74da6cd92cddd6e6aba9f639f44293ae5c Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 4 Oct 2023 15:46:06 +0800 Subject: [PATCH 1/2] [Feature][Connector-V2][File] Support read empty directory --- .../seatunnel/file/hdfs/source/BaseHdfsFileSource.java | 5 +++++ .../file/source/reader/AbstractReadStrategy.java | 10 ---------- .../seatunnel/file/cos/source/CosFileSource.java | 5 +++++ .../seatunnel/file/ftp/source/FtpFileSource.java | 5 +++++ .../seatunnel/file/oss/source/OssFileSource.java | 5 +++++ .../seatunnel/file/local/source/LocalFileSource.java | 5 +++++ .../seatunnel/file/oss/source/OssFileSource.java | 5 +++++ .../seatunnel/file/s3/source/S3FileSource.java | 5 +++++ .../seatunnel/file/sftp/source/SftpFileSource.java | 5 +++++ 9 files changed, 40 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 57d2ceca6eb..ac45bc95bc6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -110,6 +110,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index e4e1694f30d..7de0a242ccf 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -24,8 +24,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; import org.apache.hadoop.conf.Configuration; @@ -154,14 +152,6 @@ public List getFileNamesByPath(HadoopConf hadoopConf, String path) throw } } - if (fileNames.isEmpty()) { - throw new FileConnectorException( - FileConnectorErrorCode.FILE_LIST_EMPTY, - "The target file list is empty," - + "SeaTunnel will not be able to sync empty table, " - + "please check the configuration parameters such as: [file_filter_pattern]"); - } - return fileNames; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java index aefc339121e..ae261a8408a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java @@ -106,6 +106,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java index 0c35e50f90c..0f6acc241de 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java @@ -111,6 +111,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index e00b7abc286..a662d673c13 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -107,6 +107,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java index 5bbf6e3e1ea..9c93816fc11 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java @@ -108,6 +108,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index d4076775a0d..60f894917e4 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -106,6 +106,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java index 5e559e06a00..6bd3af113f5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java @@ -103,6 +103,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java index fd9d487e211..8d2d2376f99 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java @@ -111,6 +111,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { "SeaTunnel does not supported this file format"); } } else { + if (filePaths.isEmpty()) { + // When the directory is empty, distribute default behavior schema + rowType = CatalogTableUtil.buildSimpleTextSchema(); + return; + } try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FileConnectorException e) { From 3e3eceb63c282dc305e3158a1a3484481180078c Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 4 Oct 2023 16:16:42 +0800 Subject: [PATCH 2/2] [Feature][Connector-V2][File] Add e2e test cases --- .../e2e/connector/file/local/LocalFileIT.java | 5 +++ .../resources/json/local_file_to_console.conf | 37 +++++++++++++++++++ .../parquet/local_file_to_console.conf | 37 +++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index aed35767263..c454c6ce2cd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -69,6 +69,7 @@ public class LocalFileIT extends TestSuiteBase { "/excel/e2e.xlsx", "/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", container); + container.execInContainer("mkdir", "-p", "/tmp/fake_empty"); }; @TestTemplate @@ -105,5 +106,9 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/parquet/local_file_parquet_projection_to_assert.conf"); // test read filtered local file helper.execute("/excel/local_filter_excel_to_assert.conf"); + + // test read empty directory + helper.execute("/json/local_file_to_console.conf"); + helper.execute("/parquet/local_file_to_console.conf"); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf new file mode 100644 index 00000000000..4595f838887 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_to_console.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/tmp/fake_empty" + file_format_type = "json" + } +} + +sink { + Console {} +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf new file mode 100644 index 00000000000..ee3bff3fb93 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_to_console.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/tmp/fake_empty" + file_format_type = "parquet" + } +} + +sink { + Console {} +} \ No newline at end of file