Skip to content

Commit

Permalink
HADOOP-18543. AliyunOSSFileSystem#open(Path path, int bufferSize) use…
Browse files Browse the repository at this point in the history
… buffer size as its downloadPartSize
  • Loading branch information
masteryhx committed Nov 28, 2022
1 parent e09e81a commit 7580a7c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,21 @@ private void validatePath(Path path) throws IOException {
} while (fPart != null);
}

@Override
public FSDataInputStream open(Path path) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
if (fileStatus.isDirectory()) {
throw new FileNotFoundException("Can't open " + path +
" because it is a directory");
}

return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
statistics));
}

@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
final FileStatus fileStatus = getFileStatus(path);
Expand All @@ -593,7 +608,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
" because it is a directory");
}

return new FSDataInputStream(new AliyunOSSInputStream(getConf(),
return new FSDataInputStream(new AliyunOSSInputStream(bufferSize,
new SemaphoredDelegatingExecutor(
boundedThreadPool, maxReadAheadPartNumber, true),
maxReadAheadPartNumber, store, pathToKey(path), fileStatus.getLen(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,21 @@ public class AliyunOSSInputStream extends FSInputStream {
private ExecutorService readAheadExecutorService;
private Queue<ReadBuffer> readBufferQueue = new ArrayDeque<>();

public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
public AliyunOSSInputStream(
long downloadPartSize,
ExecutorService readAheadExecutorService,
int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store,
String key,
Long contentLength,
Statistics statistics) throws IOException {
this.readAheadExecutorService =
MoreExecutors.listeningDecorator(readAheadExecutorService);
MoreExecutors.listeningDecorator(readAheadExecutorService);
this.store = store;
this.key = key;
this.statistics = statistics;
this.contentLength = contentLength;
downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
MULTIPART_DOWNLOAD_SIZE_DEFAULT);
this.downloadPartSize = downloadPartSize;
this.maxReadAheadPartNumber = maxReadAheadPartNumber;

this.expectNextPos = 0;
Expand All @@ -77,6 +80,18 @@ public AliyunOSSInputStream(Configuration conf,
closed = false;
}

public AliyunOSSInputStream(Configuration conf,
ExecutorService readAheadExecutorService, int maxReadAheadPartNumber,
AliyunOSSFileSystemStore store, String key, Long contentLength,
Statistics statistics) throws IOException {
this(conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
readAheadExecutorService, maxReadAheadPartNumber, store, key, contentLength, statistics);
}

long getDownloadPartSize() {
return downloadPartSize;
}

/**
* Reopen the wrapped stream at give position, by seeking for
* data of a part length from object content stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Random;

import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_DEFAULT;
import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_DOWNLOAD_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -108,6 +112,31 @@ public void testSeekFile() throws Exception {
IOUtils.closeStream(instream);
}

@Test
public void testConfiguration() throws IOException {
Path configurationFile = setPath("/test/configurationFile.txt");
long size = 5 * 1024 * 1024;

ContractTestUtils.generateTestFile(this.fs, configurationFile, size, 256, 255);
LOG.info("5MB file created: configurationFile.txt");

FSDataInputStream instream = this.fs.open(configurationFile);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
AliyunOSSInputStream wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
fs.getConf().getLong(MULTIPART_DOWNLOAD_SIZE_KEY, MULTIPART_DOWNLOAD_SIZE_DEFAULT),
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);

instream = this.fs.open(configurationFile, 1024);
assertTrue(instream.getWrappedStream() instanceof AliyunOSSInputStream);
wrappedStream = (AliyunOSSInputStream) instream.getWrappedStream();
assertEquals(
1024,
wrappedStream.getDownloadPartSize());
IOUtils.closeStream(instream);
}

@Test
public void testSequentialAndRandomRead() throws Exception {
Path smallSeekFile = setPath("/test/smallSeekFile.txt");
Expand Down

0 comments on commit 7580a7c

Please sign in to comment.