diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java index 83fccdeb3c4..99bf4177639 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java @@ -40,6 +40,7 @@ import java.io.OutputStream; import java.net.URI; import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Vector; @@ -154,6 +155,29 @@ private boolean exists(ChannelSftp channel, Path file) throws IOException { } } + public String quote(String path) { + byte[] _path = path.getBytes(StandardCharsets.UTF_8); + int count = 0; + for (int i = 0; i < _path.length; i++) { + byte b = _path[i]; + if (b == '\\' || b == '?' || b == '*') { + count++; + } + } + if (count == 0) { + return path; + } + byte[] _path2 = new byte[_path.length + count]; + for (int i = 0, j = 0; i < _path.length; i++) { + byte b = _path[i]; + if (b == '\\' || b == '?' || b == '*') { + _path2[j++] = '\\'; + } + _path2[j++] = b; + } + return new String(_path2, 0, _path2.length, StandardCharsets.UTF_8); + } + /** * Convenience method, so that we don't open a new connection when using this method from within * another method. Otherwise every API invocation incurs the overhead of opening/closing a TCP @@ -466,7 +490,7 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { // the path could be a symbolic link, so get the real path absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); - is = channel.get(absolute.toUri().getPath()); + is = channel.get(quote(absolute.toUri().getPath())); } catch (SftpException e) { throw new IOException(e); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java new file mode 100644 index 00000000000..0e539350b02 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SftpFileSystemTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sftp.system; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class SftpFileSystemTest { + + @Test + void convertAllTypeFileName() { + SFTPFileSystem sftpFileSystem = new SFTPFileSystem(); + Assertions.assertEquals( + "/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt", + sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt")); + // test file name with wildcard '*' + Assertions.assertEquals( + "/home/seatunnel/tmp/seatunnel/read/wildcard/e\\*e.txt", + sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e*e.txt")); + + // test file name with wildcard '?' + Assertions.assertEquals( + "/home/seatunnel/tmp/seatunnel/read/wildcard/e\\?e.txt", + sftpFileSystem.quote("/home/seatunnel/tmp/seatunnel/read/wildcard/e?e.txt")); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java index 2ac185aabbd..235f39ae38d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java @@ -113,6 +113,17 @@ public void startUp() throws Exception { "/home/seatunnel/tmp/seatunnel/read/xml/name=tyrantlucifer/hobby=coding/e2e.xml", sftpContainer); + // Windows does not support files with wildcard characters. We can rename `e2e.txt` to + // `e*e.txt` when copying to a container + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/wildcard/e*e.txt", + sftpContainer); + + ContainerUtil.copyFileIntoContainers( + "/text/e2e.txt", + "/home/seatunnel/tmp/seatunnel/read/wildcard/e2e.txt", + sftpContainer); sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/"); } @@ -138,6 +149,9 @@ public void testSftpFileReadAndWrite(TestContainer container) helper.execute("/text/sftp_file_text_projection_to_assert.conf"); // test read sftp zip text file helper.execute("/text/sftp_file_zip_text_to_assert.conf"); + // test read file wit wildcard character, should match tmp/seatunnel/read/wildcard/e*e.txt + // and tmp/seatunnel/read/wildcard/e2e.txt + helper.execute("/text/sftp_file_text_wildcard_character_to_assert.conf"); // test write sftp json file helper.execute("/json/fake_to_sftp_file_json.conf"); // test read sftp json file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf new file mode 100644 index 00000000000..cd8e27b743e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_wildcard_character_to_assert.conf @@ -0,0 +1,117 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + SftpFile { + host = "sftp" + port = 22 + user = seatunnel + password = pass + path = "tmp/seatunnel/read/wildcard/" + file_format_type = "text" + plugin_output = "sftp" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Assert { + plugin_input = "sftp" + rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 10 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file