Skip to content

Commit

Permalink
apacheGH-294: Fix memory leaks in SftpFileSystemProvider
Browse files Browse the repository at this point in the history
SftpFileSystem.getClient() returns reference-counted wrapper
instances that need to be closed to avoid a memory leak via
ThreadLocals. Make sure that the streams returned by
SftpFileSystemProvider.newInputStream() and newOutputStream()
do close the client. Also fix SftpFileSystemProvider.copy() to
close the SftpClient it uses.

SftpFileSystemProvider.newDirectoryStream() and newFileChannel()
already do close the SftpClient used.

Fix the SftpFileSystemTest to properly close SftpClients and
DirectoryStreams.

Bug: apache#294
  • Loading branch information
tomaswolf committed Dec 20, 2022
1 parent 78ed6bc commit 5cd1d46
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.sshd.sftp.client.fs;

import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -518,7 +520,17 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc
modes = EnumSet.of(OpenMode.Read);
}
SftpPath p = toSftpPath(path);
return p.getFileSystem().getClient().read(p.toString(), modes);
SftpClient client = p.getFileSystem().getClient();
return new FilterInputStream(client.read(p.toString(), modes)) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
client.close();
}
}
};
}

@Override
Expand All @@ -533,7 +545,18 @@ public OutputStream newOutputStream(Path path, OpenOption... options) throws IOE
modes.add(OpenMode.Write);
}
SftpPath p = toSftpPath(path);
return p.getFileSystem().getClient().write(p.toString(), modes);
SftpClient client = p.getFileSystem().getClient();
return new FilterOutputStream(client.write(p.toString(), modes)) {

@Override
public void close() throws IOException {
try {
super.close();
} finally {
client.close();
}
}
};
}

@Override
Expand Down Expand Up @@ -642,13 +665,15 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
if (attrs.isDirectory()) {
createDirectory(target);
} else {
CopyFileExtension copyFile = src.getFileSystem().getClient().getExtension(CopyFileExtension.class);
if (copyFile.isSupported()) {
copyFile.copyFile(source.toString(), target.toString(), false);
} else {
try (InputStream in = newInputStream(source);
OutputStream os = newOutputStream(target)) {
IoUtils.copy(in, os);
try (SftpClient client = src.getFileSystem().getClient()) {
CopyFileExtension copyFile = client.getExtension(CopyFileExtension.class);
if (copyFile.isSupported()) {
copyFile.copyFile(source.toString(), target.toString(), false);
} else {
try (InputStream in = newInputStream(source);
OutputStream os = newOutputStream(target)) {
IoUtils.copy(in, os);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.sshd.sftp.client.fs;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -28,6 +29,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.io.output.ByteArrayOutputStream;

/**
* @author <a href="mailto:[email protected]">Apache MINA SSHD Project</a>
Expand Down Expand Up @@ -236,12 +239,12 @@ public void received(ServerSession session, int type, int id) throws IOException
assertEquals("READ_DIR should not have been called yet", 0, readDirCount.get());

SftpPath sftpPath = (SftpPath) remoteDir;
SftpClient client = sftpPath.getFileSystem().getClient();

// Actual test starts here
int i = 0;
long start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = sftpPath.getFileSystem().getClient();
CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand Down Expand Up @@ -275,12 +278,12 @@ public void describeTo(Description description) {
long indirectTime = 0;
for (int attempt = 0; attempt < maxRepeats; attempt++) {
// Now try the same directly at the upstream server
client = ((SftpFileSystem) secondHop).getClient();
statCount.set(0);
readDirCount.set(0);
i = 0;
start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = ((SftpFileSystem) secondHop).getClient();
CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand All @@ -294,12 +297,12 @@ public void describeTo(Description description) {
"{}: directory listing with {} files from upstream server took {}ms, got {} entries: READDIR called {} times, (L)STAT called {} times",
getCurrentTestName(), numberOfFiles, elapsed, i, readDirCount, statCount);

client = sftpPath.getFileSystem().getClient();
statCount.set(0);
readDirCount.set(0);
i = 0;
start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = sftpPath.getFileSystem().getClient();
CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand Down Expand Up @@ -347,28 +350,32 @@ MapBuilder.<String, Object> builder()
Path remoteFile = fs.getPath(remFilePath);
assertHierarchyTargetFolderExists(remoteFile.getParent());
int n = 0;
for (Path p : Files.newDirectoryStream(remoteFile.getParent())) {
n++;
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 5, cached.getSize());
// Now modify the file and fetch attributes again
Files.write(p, "Bye".getBytes(StandardCharsets.UTF_8));
BasicFileAttributes attributes = Files.readAttributes(p, BasicFileAttributes.class);
assertNotEquals("Sizes should be different", attributes.size(), cached.getSize());
assertEquals("Unexpected size after modification", 3, attributes.size());
assertNull("Path should not have cached attributes anymore", ((SftpPath) p).getAttributes());
try (DirectoryStream<Path> directory = Files.newDirectoryStream(remoteFile.getParent())) {
for (Path p : directory) {
n++;
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 5, cached.getSize());
// Now modify the file and fetch attributes again
Files.write(p, "Bye".getBytes(StandardCharsets.UTF_8));
BasicFileAttributes attributes = Files.readAttributes(p, BasicFileAttributes.class);
assertNotEquals("Sizes should be different", attributes.size(), cached.getSize());
assertEquals("Unexpected size after modification", 3, attributes.size());
assertNull("Path should not have cached attributes anymore", ((SftpPath) p).getAttributes());
}
}
assertEquals("Unexpected number of files", 2, n);
// And again
List<Path> obtained = new ArrayList<>(2);
for (Path p : Files.newDirectoryStream(remoteFile.getParent())) {
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 3, cached.getSize());
obtained.add(p);
try (DirectoryStream<Path> directory = Files.newDirectoryStream(remoteFile.getParent())) {
for (Path p : directory) {
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 3, cached.getSize());
obtained.add(p);
}
}
assertEquals("Unexpected number of files", 2, obtained.size());
// Now modify the files and fetch attributes again
Expand Down Expand Up @@ -503,6 +510,79 @@ public void testFileChannel() throws IOException {
assertArrayEquals("Mismatched persisted data", expected, actual);
}

@Test
public void testFileCopy() throws IOException {
Path targetPath = detectTargetFolder();
Path lclSftp = CommonTestSupportUtils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME,
getClass().getSimpleName());
Files.createDirectories(lclSftp);

Path lclFile = lclSftp.resolve(getCurrentTestName() + ".txt");
Files.deleteIfExists(lclFile);
Path lclFile2 = lclSftp.resolve(getCurrentTestName() + ".txt2");
Files.deleteIfExists(lclFile2);
byte[] expected = (getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")")
.getBytes(StandardCharsets.UTF_8);
try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) {
Path parentPath = targetPath.getParent();
String remFilePath = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, lclFile);
Path file = fs.getPath(remFilePath);

FileSystemProvider provider = fs.provider();
try (FileChannel fc = provider.newFileChannel(file,
EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))) {
int writeLen = fc.write(ByteBuffer.wrap(expected));
assertEquals("Mismatched written length", expected.length, writeLen);

FileChannel fcPos = fc.position(0L);
assertSame("Mismatched positioned file channel", fc, fcPos);

byte[] actual = new byte[expected.length];
int readLen = fc.read(ByteBuffer.wrap(actual));
assertEquals("Mismatched read len", writeLen, readLen);
assertArrayEquals("Mismatched read data", expected, actual);
}
Path sibling = file.getParent().resolve(file.getFileName().toString() + '2');
Files.copy(file, sibling);
}

byte[] actual = Files.readAllBytes(lclFile);
assertArrayEquals("Mismatched persisted data", expected, actual);
actual = Files.readAllBytes(lclFile2);
assertArrayEquals("Mismatched copied data", expected, actual);
}

@Test
public void testFileWriteRead() throws IOException {
Path targetPath = detectTargetFolder();
Path lclSftp = CommonTestSupportUtils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME,
getClass().getSimpleName());
Files.createDirectories(lclSftp);

Path lclFile = lclSftp.resolve(getCurrentTestName() + ".txt");
Files.deleteIfExists(lclFile);
byte[] expected = (getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")")
.getBytes(StandardCharsets.UTF_8);
try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) {
Path parentPath = targetPath.getParent();
String remFilePath = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, lclFile);

try (SftpClient client = ((SftpFileSystem) fs).getClient()) {
try (OutputStream out = client.write(remFilePath)) {
IoUtils.copy(new ByteArrayInputStream(expected), out);
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
InputStream in = client.read(remFilePath)) {
IoUtils.copy(in, out);
assertArrayEquals("Mismatched persisted data", expected, out.toByteArray());
}
}
}

byte[] actual = Files.readAllBytes(lclFile);
assertArrayEquals("Mismatched persisted data", expected, actual);
}

@Test
public void testFileStore() throws IOException {
try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) {
Expand Down

0 comments on commit 5cd1d46

Please sign in to comment.