Skip to content

Commit

Permalink
apacheGH-294: Fix memory leaks in SftpFileSystemProvider
Browse files Browse the repository at this point in the history
SftpFileSystem.getClient() returns wrapper instances that need to
be closed to avoid a memory leak via ThreadLocals. Make sure that
the streams returned by SftpFileSystemProvider.newInputStream() and
newOutputStream() do close the client used by ensuring that the
streams returned by SftpFileSystem.read() and write() do so. Also
fix SftpFileSystemProvider.copy() to close the SftpClient it uses.

SftpFileSystemProvider.newDirectoryStream() and newFileChannel()
already do close the SftpClient used.

Fix the SftpFileSystemTest to properly close SftpClients and
DirectoryStreams.

Bug: apache#294
  • Loading branch information
tomaswolf committed Dec 19, 2022
1 parent 78ed6bc commit bfcf2cf
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.sshd.sftp.client.fs;

import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -488,7 +490,16 @@ public InputStream read(String path, int bufferSize, Collection<OpenMode> mode)
if (!isOpen()) {
throw new IOException("read(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
}
return delegate.read(path, bufferSize, mode);
return new FilterInputStream(delegate.read(path, bufferSize, mode)) {
@Override
public void close() throws IOException {
try {
super.close();
} finally {
Wrapper.this.close();
}
}
};
}

@Override
Expand All @@ -511,7 +522,17 @@ public OutputStream write(String path, int bufferSize, Collection<OpenMode> mode
if (!isOpen()) {
throw new IOException("write(" + path + ")[" + mode + "] size=" + bufferSize + ": client is closed");
}
return delegate.write(path, bufferSize, mode);
return new FilterOutputStream(delegate.write(path, bufferSize, mode)) {

@Override
public void close() throws IOException {
try {
super.close();
} finally {
Wrapper.this.close();
}
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,13 +642,15 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep
if (attrs.isDirectory()) {
createDirectory(target);
} else {
CopyFileExtension copyFile = src.getFileSystem().getClient().getExtension(CopyFileExtension.class);
if (copyFile.isSupported()) {
copyFile.copyFile(source.toString(), target.toString(), false);
} else {
try (InputStream in = newInputStream(source);
OutputStream os = newOutputStream(target)) {
IoUtils.copy(in, os);
try (SftpClient client = src.getFileSystem().getClient()) {
CopyFileExtension copyFile = client.getExtension(CopyFileExtension.class);
if (copyFile.isSupported()) {
copyFile.copyFile(source.toString(), target.toString(), false);
} else {
try (InputStream in = newInputStream(source);
OutputStream os = newOutputStream(target)) {
IoUtils.copy(in, os);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -236,12 +237,11 @@ public void received(ServerSession session, int type, int id) throws IOException
assertEquals("READ_DIR should not have been called yet", 0, readDirCount.get());

SftpPath sftpPath = (SftpPath) remoteDir;
SftpClient client = sftpPath.getFileSystem().getClient();

// Actual test starts here
int i = 0;
long start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = sftpPath.getFileSystem().getClient(); CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand Down Expand Up @@ -275,12 +275,12 @@ public void describeTo(Description description) {
long indirectTime = 0;
for (int attempt = 0; attempt < maxRepeats; attempt++) {
// Now try the same directly at the upstream server
client = ((SftpFileSystem) secondHop).getClient();
statCount.set(0);
readDirCount.set(0);
i = 0;
start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = ((SftpFileSystem) secondHop).getClient();
CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand All @@ -294,12 +294,12 @@ public void describeTo(Description description) {
"{}: directory listing with {} files from upstream server took {}ms, got {} entries: READDIR called {} times, (L)STAT called {} times",
getCurrentTestName(), numberOfFiles, elapsed, i, readDirCount, statCount);

client = sftpPath.getFileSystem().getClient();
statCount.set(0);
readDirCount.set(0);
i = 0;
start = System.currentTimeMillis();
try (CloseableHandle dir = client.openDir(remDirPath)) {
try (SftpClient client = sftpPath.getFileSystem().getClient();
CloseableHandle dir = client.openDir(remDirPath)) {
for (SftpClient.DirEntry entry : client.listDir(dir)) {
i++;
assertNotNull(entry);
Expand Down Expand Up @@ -347,28 +347,32 @@ MapBuilder.<String, Object> builder()
Path remoteFile = fs.getPath(remFilePath);
assertHierarchyTargetFolderExists(remoteFile.getParent());
int n = 0;
for (Path p : Files.newDirectoryStream(remoteFile.getParent())) {
n++;
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 5, cached.getSize());
// Now modify the file and fetch attributes again
Files.write(p, "Bye".getBytes(StandardCharsets.UTF_8));
BasicFileAttributes attributes = Files.readAttributes(p, BasicFileAttributes.class);
assertNotEquals("Sizes should be different", attributes.size(), cached.getSize());
assertEquals("Unexpected size after modification", 3, attributes.size());
assertNull("Path should not have cached attributes anymore", ((SftpPath) p).getAttributes());
try (DirectoryStream<Path> directory = Files.newDirectoryStream(remoteFile.getParent())) {
for (Path p : directory) {
n++;
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 5, cached.getSize());
// Now modify the file and fetch attributes again
Files.write(p, "Bye".getBytes(StandardCharsets.UTF_8));
BasicFileAttributes attributes = Files.readAttributes(p, BasicFileAttributes.class);
assertNotEquals("Sizes should be different", attributes.size(), cached.getSize());
assertEquals("Unexpected size after modification", 3, attributes.size());
assertNull("Path should not have cached attributes anymore", ((SftpPath) p).getAttributes());
}
}
assertEquals("Unexpected number of files", 2, n);
// And again
List<Path> obtained = new ArrayList<>(2);
for (Path p : Files.newDirectoryStream(remoteFile.getParent())) {
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 3, cached.getSize());
obtained.add(p);
try (DirectoryStream<Path> directory = Files.newDirectoryStream(remoteFile.getParent())) {
for (Path p : directory) {
assertTrue("Expected an SftpPath", p instanceof SftpPath);
SftpClient.Attributes cached = ((SftpPath) p).getAttributes();
assertNotNull("Path should have cached attributes", cached);
assertEquals("Unexpected size reported", 3, cached.getSize());
obtained.add(p);
}
}
assertEquals("Unexpected number of files", 2, obtained.size());
// Now modify the files and fetch attributes again
Expand Down Expand Up @@ -503,6 +507,48 @@ public void testFileChannel() throws IOException {
assertArrayEquals("Mismatched persisted data", expected, actual);
}

@Test
public void testFileCopy() throws IOException {
Path targetPath = detectTargetFolder();
Path lclSftp = CommonTestSupportUtils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME,
getClass().getSimpleName());
Files.createDirectories(lclSftp);

Path lclFile = lclSftp.resolve(getCurrentTestName() + ".txt");
Files.deleteIfExists(lclFile);
Path lclFile2 = lclSftp.resolve(getCurrentTestName() + ".txt2");
Files.deleteIfExists(lclFile2);
byte[] expected = (getClass().getName() + "#" + getCurrentTestName() + "(" + new Date() + ")")
.getBytes(StandardCharsets.UTF_8);
try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) {
Path parentPath = targetPath.getParent();
String remFilePath = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, lclFile);
Path file = fs.getPath(remFilePath);

FileSystemProvider provider = fs.provider();
try (FileChannel fc = provider.newFileChannel(file,
EnumSet.of(StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE))) {
int writeLen = fc.write(ByteBuffer.wrap(expected));
assertEquals("Mismatched written length", expected.length, writeLen);

FileChannel fcPos = fc.position(0L);
assertSame("Mismatched positioned file channel", fc, fcPos);

byte[] actual = new byte[expected.length];
int readLen = fc.read(ByteBuffer.wrap(actual));
assertEquals("Mismatched read len", writeLen, readLen);
assertArrayEquals("Mismatched read data", expected, actual);
}
Path sibling = file.getParent().resolve(file.getFileName().toString() + '2');
Files.copy(file, sibling);
}

byte[] actual = Files.readAllBytes(lclFile);
assertArrayEquals("Mismatched persisted data", expected, actual);
actual = Files.readAllBytes(lclFile2);
assertArrayEquals("Mismatched copied data", expected, actual);
}

@Test
public void testFileStore() throws IOException {
try (FileSystem fs = FileSystems.newFileSystem(createDefaultFileSystemURI(), Collections.emptyMap())) {
Expand Down

0 comments on commit bfcf2cf

Please sign in to comment.