Skip to content

Commit

Permalink
Add exception handling. Fix Multi-part uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatbasik committed Dec 13, 2024
1 parent e38b482 commit e56882e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ private Constants() {
/**
* Config to specify usage of crt client with Analytics Accelerator Library for Amazon S3 and it is by default true
*/
public static final String USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR = "fs.s3a.analytics.accelerator.crt.client";
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED = "fs.s3a.analytics.accelerator.crt.client";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
Expand All @@ -1785,9 +1785,9 @@ private Constants() {
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Default value for {@link #USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR }
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
* Value {@value}.
*/
public static final boolean USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT = true;
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;

}
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;

private boolean analyticsAcceleratorCRTEnabled;

// Size in bytes of a single prefetch block.
private int prefetchBlockSize;

Expand Down Expand Up @@ -704,13 +706,16 @@ public void initialize(URI name, Configuration originalConf)
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);

this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);

if(!analyticsAcceleratorEnabled) {
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
} else {
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);

if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
this.isMultipartUploadEnabled = false;
}

// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

Expand Down Expand Up @@ -841,7 +846,7 @@ public void initialize(URI name, Configuration originalConf)

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(conf.getBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT)) {
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3CrtClient");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ public boolean hasCapability(String capability) {
@Override
public int read() throws IOException {
throwIfClosed();
return inputStream.read();
int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe){
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}

@Override
Expand Down Expand Up @@ -89,18 +96,32 @@ public synchronized long getPos() {
*
* @param buf buffer to read data into
* @param off start position in buffer at which data is written
* @param n the number of bytes to read; the n-th byte should be the last byte of the stream.
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
* @return the total number of bytes read into the buffer
*/
public void readTail(byte[] buf, int off, int n) throws IOException {
public int readTail(byte[] buf, int off, int len) throws IOException {

Check failure on line 102 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java#L102

javadoc: warning: no @throws for java.io.IOException

Check failure on line 102 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java#L102

javadoc: warning: no @throws for java.io.IOException
throwIfClosed();
inputStream.readTail(buf, off, n);
int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
return inputStream.read(buf, off, len);
int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
return bytesRead;
}


Expand All @@ -118,12 +139,37 @@ public int available() throws IOException {
@Override
public void close() throws IOException {
if (inputStream != null) {
inputStream.close();
inputStream = null;
super.close();
try {
inputStream.close();
inputStream = null;
super.close();
} catch (IOException ioe) {
LOG.debug("Failure closing stream {}: ", key);
throw ioe;
}
}
}

/**
* Close the stream on read failure.
* No attempt to recover from failure
* @param ioe exception caught.
*/
@Retries.OnceTranslated
private void onReadFailure(IOException ioe) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
}
this.close();
}


protected void throwIfClosed() throws IOException {
if (isClosed()) {
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
Expand All @@ -48,7 +48,7 @@ public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOExc
Configuration conf = getConfiguration();
removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);
conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);

String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf);
Expand Down Expand Up @@ -83,7 +83,7 @@ public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
//Set Blobstore Capacity
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);

conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);
conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.security.UserGroupInformation;

import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
Expand Down Expand Up @@ -182,6 +183,7 @@ public void setup() throws Exception {
// destroy all filesystems from previous runs.
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
super.setup();
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
Expand Down

0 comments on commit e56882e

Please sign in to comment.