Skip to content

Commit

Permalink
Address review feedback v2
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatbasik committed Dec 17, 2024
1 parent 28714ac commit db01524
Show file tree
Hide file tree
Showing 32 changed files with 126 additions and 97 deletions.
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@
<dependency>
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
<artifactId>analyticsaccelerator-s3</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,

/**
* CRT-Based S3Client created of analytics accelerator library is enabled
* and managed by the ClientManager. Analytics accelerator library can be
* and managed by the S3AStoreImpl. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient s3AsyncClient;
Expand Down Expand Up @@ -545,7 +545,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

Expand Down Expand Up @@ -705,8 +705,10 @@ public void initialize(URI name, Configuration originalConf)
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);

this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
Expand Down Expand Up @@ -847,10 +849,10 @@ public void initialize(URI name, Configuration originalConf)
if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3CrtClient");
LOG.info("Using S3 CRT client for analytics accelerator S3");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3Client");
LOG.info("Using S3 async client for analytics accelerator S3");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class S3ASeekableStream extends FSInputStream implements StreamCapabiliti
private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private final String key;
private volatile boolean closed;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);

Expand Down Expand Up @@ -84,7 +85,7 @@ public void seek(long pos) throws IOException {

@Override
public synchronized long getPos() {
if (!isClosed()) {
if (!closed) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
Expand Down Expand Up @@ -139,8 +140,9 @@ public int available() throws IOException {
}

@Override
public void close() throws IOException {
if (inputStream != null) {
public synchronized void close() throws IOException {
if(!closed) {
closed = true;
try {
inputStream.close();
inputStream = null;
Expand Down Expand Up @@ -174,12 +176,8 @@ private void onReadFailure(IOException ioe) throws IOException {


protected void throwIfClosed() throws IOException {
if (isClosed()) {
if (closed) {
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}

protected boolean isClosed() {
return inputStream == null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ protected Configuration createConfiguration() {

@Override
public void testOverwriteExistingFile() throws Throwable {
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
// Will remove this when Analytics Accelerator supports overwrites
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(),
"Analytics Accelerator does not support overwrites yet");
super.testOverwriteExistingFile();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ protected Configuration createConfiguration() {
return newConf;
}

@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
}

@Override
protected boolean shouldUseDirectWrite() {
return true;
Expand Down Expand Up @@ -85,6 +79,14 @@ public void testNonDirectWrite() throws Exception {
getRenameOperationCount() - renames);
}

@Override
public void testDistCpUpdateCheckFileSkip() throws Exception {
//Will remove this when Analytics Accelerator supports overwrites
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
"Analytics Accelerator Library does not support update to existing files");
super.testDistCpUpdateCheckFileSkip();
}

private long getRenameOperationCount() {
return getFileSystem().getStorageStatistics()
.getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class ITestS3AContractRename extends AbstractContractRenameTest {
@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(getContract().getConf());
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
"Analytics Accelerator does not support rename");

}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ protected AbstractFSContract createContract(Configuration conf) {
}

/**
* Analytics Accelerator Library for Amazon S3 does not support Vectored Reads
* Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
* @throws Exception
*/
@Override
public void setup() throws Exception {
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
"Analytics Accelerator does not support vectored reads");
super.setup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;

/**
* Tests behavior of a FileNotFound error that happens after open(), i.e. on
Expand Down Expand Up @@ -65,6 +66,8 @@ protected Configuration createConfiguration() {
*/
@Test
public void testNotFoundFirstRead() throws Exception {
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"Temporarily disabling to fix Exception handling on Analytics Accelerator");
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
fs.getChangeDetectionPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ protected Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"Analytics Accelerator does not support SSEC");
assumeEnabled();
// although not a root dir test, this confuses paths enough it shouldn't be run in
// parallel with other jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,19 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()

@Override
public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
//Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf());
// Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
"Analytics Accelerator does not support overwrites");
}

@Override
public void testWriteReadAndDeleteTwoBlocks() throws Exception {
//Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf());
// Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
"Analytics Accelerator does not support overwrites");
}

@Override
@Override
public void testOverwrite() throws IOException {
boolean createPerformance = isCreatePerformanceEnabled(fSys);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ public void testOverwrite() throws IOException {

@Override
public void testOverWriteAndRead() throws Exception {
skipIfAnalyticsAcceleratorEnabled(fs.getConf());
//Will remove this when Analytics Accelerator supports overwrites
skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
"Analytics Accelerator does not support overwrites");
super.testOverWriteAndRead();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
skipIfAnalyticsAcceleratorEnabled(getContract().getConf());
// TODO: Add IOStatistics Support to S3SeekableStream
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3SeekableStream does not support IOStatisticsContext");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ public void setup() throws Exception {
@Test
public void testFinalizer() throws Throwable {
Path path = methodPath();
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
// TODO: Add Leak Detection to S3SeekableStream
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3SeekableStream does not support leak detection");

final S3AFileSystem fs = getFileSystem();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public void testMetricsRegister()

@Test
public void testStreamStatistics() throws IOException {
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
// TODO: Add StreamStatistics support to S3SeekableStream
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
"S3SeekableStream does not support stream statistics");

S3AFileSystem fs = getFileSystem();
Path file = path("testStreamStatistics");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;

import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
Expand Down Expand Up @@ -78,7 +74,6 @@ public void setUp() throws Exception {
super.setup();
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
conf = createConfiguration();
skipIfAnalyticsAcceleratorEnabled(conf);
testFile = getExternalData(conf);
prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
fs = FileSystem.get(testFile.toUri(), conf);
Expand All @@ -99,6 +94,8 @@ public Configuration createConfiguration() {
final String bufferDirBase = configuration.get(BUFFER_DIR);
bufferDir = bufferDirBase + "/" + UUID.randomUUID();
configuration.set(BUFFER_DIR, bufferDir);
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
Expand Down Expand Up @@ -76,11 +74,6 @@ public ITestS3APrefetchingInputStream() {
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;

@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
}

@Override
public Configuration createConfiguration() {
Expand All @@ -89,6 +82,8 @@ public Configuration createConfiguration() {
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
Expand All @@ -69,12 +66,6 @@ public static Collection<Object[]> params() {
});
}

@Override
public void setup() throws Exception {
super.setup();
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
}

public ITestS3APrefetchingLruEviction(final String maxBlocks) {
super(true);
this.maxBlocks = maxBlocks;
Expand All @@ -100,6 +91,8 @@ public Configuration createConfiguration() {
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return conf;
}

Expand Down
Loading

0 comments on commit db01524

Please sign in to comment.