From f7652940b429f78d141b577e8e75e033c289c5da Mon Sep 17 00:00:00 2001 From: u5surf Date: Fri, 10 Jan 2025 09:53:33 +0900 Subject: [PATCH] Rewrite with Appache Commons Net --- README.md | 2 +- build.gradle | 39 +-- .../compileClasspath.lockfile | 1 + .../runtimeClasspath.lockfile | 1 + .../embulk/input/ftp/FtpFileInputPlugin.java | 289 +++++++----------- 5 files changed, 140 insertions(+), 192 deletions(-) diff --git a/README.md b/README.md index d93679a..bcc7fc9 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ in: ## Build ``` -$ ./gradlew embulk-input-ftp:gem +$ ./gradlew gem ``` ## Release diff --git a/build.gradle b/build.gradle index 1d83ec4..c7d96ab 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,7 @@ dependencies { compileOnly "org.embulk:embulk-api:0.10.19" compileOnly "org.embulk:embulk-spi:0.10.19" - compile("org.embulk:embulk-util-config:0.3.1") { + implementation("org.embulk:embulk-util-config:0.3.1") { // They conflict with embulk-core. They are once excluded here, // and added explicitly with versions exactly the same with embulk-core:0.10.19. exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations" @@ -47,24 +47,25 @@ dependencies { // They are once excluded from transitive dependencies of other dependencies, // and added explicitly with versions exactly the same with embulk-core:0.10.19. - compile "com.fasterxml.jackson.core:jackson-annotations:2.6.7" - compile "com.fasterxml.jackson.core:jackson-core:2.6.7" - compile "com.fasterxml.jackson.core:jackson-databind:2.6.7" - compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7" - compile "javax.validation:validation-api:1.1.0.Final" - - compile "org.embulk:embulk-util-ssl:0.3.0" - compile files("${project.rootDir}/libs/ftp4j-1.7.2.jar") - compile "org.bouncycastle:bcpkix-jdk15on:1.52" - - compile "org.embulk:embulk-util-file:0.1.3" - compile "org.embulk:embulk-util-retryhelper:0.8.2" - - testCompile "junit:junit:4.13.2" - testCompile "org.embulk:embulk-core:0.10.19" - testCompile "org.embulk:embulk-core:0.10.19:tests" - testCompile "org.embulk:embulk-standards:0.10.19" - testCompile "org.embulk:embulk-deps:0.10.19" + implementation "com.fasterxml.jackson.core:jackson-annotations:2.6.7" + implementation "com.fasterxml.jackson.core:jackson-core:2.6.7" + implementation "com.fasterxml.jackson.core:jackson-databind:2.6.7" + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7" + implementation "javax.validation:validation-api:1.1.0.Final" + + implementation "org.embulk:embulk-util-ssl:0.3.0" + implementation "commons-net:commons-net:3.11.1" + implementation files("${project.rootDir}/libs/ftp4j-1.7.2.jar") + implementation "org.bouncycastle:bcpkix-jdk15on:1.52" + + implementation "org.embulk:embulk-util-file:0.1.3" + implementation "org.embulk:embulk-util-retryhelper:0.8.2" + + testImplementation "junit:junit:4.13.2" + testImplementation "org.embulk:embulk-core:0.10.19" + testImplementation "org.embulk:embulk-core:0.10.19:tests" + testImplementation "org.embulk:embulk-standards:0.10.19" + testImplementation "org.embulk:embulk-deps:0.10.19" } embulkPlugin { diff --git a/gradle/dependency-locks/compileClasspath.lockfile b/gradle/dependency-locks/compileClasspath.lockfile index 2915238..97c9cbc 100644 --- a/gradle/dependency-locks/compileClasspath.lockfile +++ b/gradle/dependency-locks/compileClasspath.lockfile @@ -5,6 +5,7 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7 com.fasterxml.jackson.core:jackson-core:2.6.7 com.fasterxml.jackson.core:jackson-databind:2.6.7 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7 +commons-net:commons-net:3.11.1 javax.validation:validation-api:1.1.0.Final org.bouncycastle:bcpkix-jdk15on:1.52 org.bouncycastle:bcprov-jdk15on:1.52 diff --git a/gradle/dependency-locks/runtimeClasspath.lockfile b/gradle/dependency-locks/runtimeClasspath.lockfile index c28561c..4bf1150 100644 --- a/gradle/dependency-locks/runtimeClasspath.lockfile +++ b/gradle/dependency-locks/runtimeClasspath.lockfile @@ -5,6 +5,7 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7 com.fasterxml.jackson.core:jackson-core:2.6.7 com.fasterxml.jackson.core:jackson-databind:2.6.7 com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7 +commons-net:commons-net:3.11.1 javax.validation:validation-api:1.1.0.Final org.bouncycastle:bcpkix-jdk15on:1.52 org.bouncycastle:bcprov-jdk15on:1.52 diff --git a/src/main/java/org/embulk/input/ftp/FtpFileInputPlugin.java b/src/main/java/org/embulk/input/ftp/FtpFileInputPlugin.java index f85c1a5..169fac8 100644 --- a/src/main/java/org/embulk/input/ftp/FtpFileInputPlugin.java +++ b/src/main/java/org/embulk/input/ftp/FtpFileInputPlugin.java @@ -1,15 +1,10 @@ package org.embulk.input.ftp; -import it.sauronsoftware.ftp4j.FTPAbortedException; -import it.sauronsoftware.ftp4j.FTPClient; -import it.sauronsoftware.ftp4j.FTPCommunicationListener; -import it.sauronsoftware.ftp4j.FTPConnector; -import it.sauronsoftware.ftp4j.FTPDataTransferException; -import it.sauronsoftware.ftp4j.FTPDataTransferListener; -import it.sauronsoftware.ftp4j.FTPException; -import it.sauronsoftware.ftp4j.FTPFile; -import it.sauronsoftware.ftp4j.FTPIllegalReplyException; -import it.sauronsoftware.ftp4j.FTPListParseException; +import org.apache.commons.net.ftp.FTP; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; +import org.apache.commons.net.ftp.FTPSClient; +import org.apache.commons.net.ftp.FTPConnectionClosedException; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; @@ -41,6 +36,7 @@ import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.nio.channels.Channels; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -191,36 +187,28 @@ private static FTPClient newFTPClient(final Logger log, final PluginTask task) try { int defaultPort = FTP_DEFULAT_PORT; if (task.getSsl()) { - client.setSSLSocketFactory(SSLPlugins.newSSLSocketFactory(task.getSSLConfig(), task.getHost())); + //client.setSSLSocketFactory(SSLPlugins.newSSLSocketFactory(task.getSSLConfig(), task.getHost())); if (task.getSslExplicit()) { - client.setSecurity(FTPClient.SECURITY_FTPES); + client = new FTPSClient(false); defaultPort = FTPES_DEFAULT_PORT; log.info("Using FTPES(FTPS/explicit) mode"); } else { - client.setSecurity(FTPClient.SECURITY_FTPS); + client = new FTPSClient(true); + // client.setKeyManager(); + // client.setTrustManager(); defaultPort = FTPS_DEFAULT_PORT; log.info("Using FTPS(FTPS/implicit) mode"); } } final int port = task.getPort().isPresent() ? task.getPort().get() : defaultPort; - client.addCommunicationListener(new LoggingCommunicationListner(log)); - - // TODO configurable timeout parameters - client.setAutoNoopTimeout(3000); - - final FTPConnector con = client.getConnector(); - con.setConnectionTimeout(30); - con.setReadTimeout(60); - con.setCloseTimeout(60); - // for commons-net client - //client.setControlKeepAliveTimeout - //client.setConnectTimeout - //client.setSoTimeout - //client.setDataTimeout - //client.setAutodetectUTF8 + client.setControlKeepAliveTimeout(Duration.ofSeconds(60)); + client.setConnectTimeout(30); + client.setSoTimeout(60); + client.setDataTimeout(Duration.ofSeconds(60)); + client.setAutodetectUTF8(true); client.connect(task.getHost(), port); log.info("Connecting to {}:{}", task.getHost(), port); @@ -231,32 +219,30 @@ private static FTPClient newFTPClient(final Logger log, final PluginTask task) } log.info("Using passive mode"); - client.setPassive(task.getPassiveMode()); + if (task.getPassiveMode()) { + client.enterLocalPassiveMode(); + } + else { + client.enterLocalActiveMode(); + } if (task.getAsciiMode()) { log.info("Using ASCII mode"); - client.setType(FTPClient.TYPE_TEXTUAL); + client.setFileType(FTP.ASCII_FILE_TYPE); } else { log.info("Using binary mode"); - client.setType(FTPClient.TYPE_BINARY); + client.setFileType(FTP.BINARY_FILE_TYPE); } - if (client.isCompressionSupported()) { - log.info("Using MODE Z compression"); - client.setCompressionEnabled(true); - } + client.setFileTransferMode(FTP.COMPRESSED_TRANSFER_MODE); final FTPClient connected = client; client = null; return connected; } - catch (final FTPException ex) { - log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage()); - throw new RuntimeException(ex); - } - catch (final FTPIllegalReplyException ex) { - log.info("FTP protocol error"); + catch (final FTPConnectionClosedException ex) { + log.info("FTP connection error: " + ex.getMessage()); throw new RuntimeException(ex); } catch (final IOException ex) { @@ -274,13 +260,7 @@ static void disconnectClient(final FTPClient client) { if (client.isConnected()) { try { - client.disconnect(false); - } - catch (final FTPException ex) { - // do nothing - } - catch (final FTPIllegalReplyException ex) { - // do nothing + client.disconnect(); } catch (final IOException ex) { // do nothing @@ -323,40 +303,20 @@ public static List listFilesByPrefix(final Logger log, final FTPClient c final ArrayList builder = new ArrayList<>(); try { - String currentDirectory = client.currentDirectory(); + String currentDirectory = client.printWorkingDirectory(); log.info("Listing ftp files at directory '{}' filtering filename by prefix '{}'", directory.isEmpty() ? currentDirectory : directory, fileNamePrefix); if (!directory.isEmpty()) { - client.changeDirectory(directory); + client.changeWorkingDirectory(directory); currentDirectory = directory; } - for (final FTPFile file : client.list()) { + for (final FTPFile file : client.listFiles()) { if (file.getName().startsWith(fileNamePrefix)) { listFilesRecursive(client, currentDirectory, file, lastPath, builder, pathMatchPattern); } } } - catch (final FTPListParseException ex) { - log.info("FTP listing files failed"); - throw new RuntimeException(ex); - } - catch (final FTPAbortedException ex) { - log.info("FTP listing files failed"); - throw new RuntimeException(ex); - } - catch (final FTPDataTransferException ex) { - log.info("FTP data transfer failed"); - throw new RuntimeException(ex); - } - catch (final FTPException ex) { - log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage()); - throw new RuntimeException(ex); - } - catch (final FTPIllegalReplyException ex) { - log.info("FTP protocol error"); - throw new RuntimeException(ex); - } catch (final IOException ex) { log.info("FTP network error: " + ex); throw new UncheckedIOException(ex); @@ -368,7 +328,7 @@ public static List listFilesByPrefix(final Logger log, final FTPClient c private static void listFilesRecursive(final FTPClient client, String baseDirectoryPath, final FTPFile file, final Optional lastPath, final ArrayList builder, final Pattern pathMatchPattern) - throws IOException, FTPException, FTPIllegalReplyException, FTPDataTransferException, FTPAbortedException, FTPListParseException + throws IOException { if (!baseDirectoryPath.endsWith("/")) { baseDirectoryPath = baseDirectoryPath + "/"; @@ -380,19 +340,19 @@ private static void listFilesRecursive(final FTPClient client, } switch (file.getType()) { - case FTPFile.TYPE_FILE: + case FTPFile.FILE_TYPE: if (pathMatchPattern.matcher(path).find()) { builder.add(path); } break; - case FTPFile.TYPE_DIRECTORY: - client.changeDirectory(path); - for (final FTPFile subFile : client.list()) { + case FTPFile.DIRECTORY_TYPE: + client.changeWorkingDirectory(path); + for (final FTPFile subFile : client.listFiles()) { listFilesRecursive(client, path, subFile, lastPath, builder, pathMatchPattern); } - client.changeDirectory(baseDirectoryPath); + client.changeWorkingDirectory(baseDirectoryPath); break; - case FTPFile.TYPE_LINK: + case FTPFile.SYMBOLIC_LINK_TYPE: // TODO } } @@ -405,83 +365,83 @@ public TransactionalFileInput open(final TaskSource taskSource, final int taskIn return new FtpFileInput(log, task, taskIndex); } - private static class LoggingCommunicationListner - implements FTPCommunicationListener - { - private final Logger log; - - public LoggingCommunicationListner(final Logger log) - { - this.log = log; - } - - @Override - public void received(final String statement) - { - log.info("< " + statement); - } - - @Override - public void sent(final String statement) - { - if (statement.startsWith("PASS")) { - // don't show password - return; - } - log.info("> " + statement); - } - } - - private static class LoggingTransferListener - implements FTPDataTransferListener - { - private final Logger log; - private final long transferNoticeBytes; - - private long totalTransfer; - private long nextTransferNotice; - - public LoggingTransferListener(final Logger log, final long transferNoticeBytes) - { - this.log = log; - this.transferNoticeBytes = transferNoticeBytes; - this.nextTransferNotice = transferNoticeBytes; - } - - @Override - public void started() - { - log.info("Transfer started"); - } - - @Override - public void transferred(final int length) - { - totalTransfer += length; - if (totalTransfer > nextTransferNotice) { - log.info("Transferred " + totalTransfer + " bytes"); - nextTransferNotice = ((totalTransfer / transferNoticeBytes) + 1) * transferNoticeBytes; - } - } - - @Override - public void completed() - { - log.info("Transfer completed " + totalTransfer + " bytes"); - } - - @Override - public void aborted() - { - log.info("Transfer aborted"); - } - - @Override - public void failed() - { - log.info("Transfer failed"); - } - } +// private static class LoggingCommunicationListner +// implements FTPCommunicationListener +// { +// private final Logger log; +// +// public LoggingCommunicationListner(final Logger log) +// { +// this.log = log; +// } +// +// @Override +// public void received(final String statement) +// { +// log.info("< " + statement); +// } +// +// @Override +// public void sent(final String statement) +// { +// if (statement.startsWith("PASS")) { +// // don't show password +// return; +// } +// log.info("> " + statement); +// } +// } +// +// private static class LoggingTransferListener +// implements FTPDataTransferListener +// { +// private final Logger log; +// private final long transferNoticeBytes; +// +// private long totalTransfer; +// private long nextTransferNotice; +// +// public LoggingTransferListener(final Logger log, final long transferNoticeBytes) +// { +// this.log = log; +// this.transferNoticeBytes = transferNoticeBytes; +// this.nextTransferNotice = transferNoticeBytes; +// } +// +// @Override +// public void started() +// { +// log.info("Transfer started"); +// } +// +// @Override +// public void transferred(final int length) +// { +// totalTransfer += length; +// if (totalTransfer > nextTransferNotice) { +// log.info("Transferred " + totalTransfer + " bytes"); +// nextTransferNotice = ((totalTransfer / transferNoticeBytes) + 1) * transferNoticeBytes; +// } +// } +// +// @Override +// public void completed() +// { +// log.info("Transfer completed " + totalTransfer + " bytes"); +// } +// +// @Override +// public void aborted() +// { +// log.info("Transfer aborted"); +// } +// +// @Override +// public void failed() +// { +// log.info("Transfer failed"); +// } +// } private static final long TRANSFER_NOTICE_BYTES = 100 * 1024 * 1024; @@ -499,23 +459,8 @@ public Runnable apply(final BlockingTransfer transfer) public void run() { try { - client.download(path, Channels.newOutputStream(transfer.getWriterChannel()), offset, new LoggingTransferListener(log, TRANSFER_NOTICE_BYTES)); - } - catch (final FTPException ex) { - log.info("FTP command failed: " + ex.getCode() + " " + ex.getMessage()); - throw new RuntimeException(ex); - } - catch (final FTPDataTransferException ex) { - log.info("FTP data transfer failed"); - throw new RuntimeException(ex); - } - catch (final FTPAbortedException ex) { - log.info("FTP listing files failed"); - throw new RuntimeException(ex); - } - catch (final FTPIllegalReplyException ex) { - log.info("FTP protocol error"); - throw new RuntimeException(ex); + client.setRestartOffset(offset); + client.retrieveFile(path, Channels.newOutputStream(transfer.getWriterChannel())); } catch (final IOException ex) { throw new UncheckedIOException(ex);