From e56882ec06750656c584fe92b1715dcdea175eea Mon Sep 17 00:00:00 2001 From: Fuat Basik Date: Fri, 13 Dec 2024 16:16:35 +0000 Subject: [PATCH] Add exception handling. Fix Multi-part uploads --- .../org/apache/hadoop/fs/s3a/Constants.java | 6 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 15 +++-- .../hadoop/fs/s3a/S3ASeekableStream.java | 62 ++++++++++++++++--- .../fs/s3a/ITestS3AS3SeekableStream.java | 6 +- .../s3a/commit/ITestS3ACommitterFactory.java | 2 + 5 files changed, 72 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 29557506419bd..850551f8dbbb4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1776,7 +1776,7 @@ private Constants() { /** * Config to specify usage of crt client with Analytics Accelerator Library for Amazon S3 and it is by default true */ - public static final String USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR = "fs.s3a.analytics.accelerator.crt.client"; + public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED = "fs.s3a.analytics.accelerator.crt.client"; /** * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY } @@ -1785,9 +1785,9 @@ private Constants() { public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false; /** - * Default value for {@link #USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR } + * Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED } * Value {@value}. */ - public static final boolean USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT = true; + public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 35e22fb179014..f9ca6d5d8ddc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -361,6 +361,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used. private boolean analyticsAcceleratorEnabled; + private boolean analyticsAcceleratorCRTEnabled; + // Size in bytes of a single prefetch block. private int prefetchBlockSize; @@ -704,13 +706,16 @@ public void initialize(URI name, Configuration originalConf) intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); + this.analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT); - if(!analyticsAcceleratorEnabled) { - this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, - DEFAULT_MULTIPART_UPLOAD_ENABLED); - } else { + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + DEFAULT_MULTIPART_UPLOAD_ENABLED); + + if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) { + // Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload. this.isMultipartUploadEnabled = false; } + // multipart copy and upload are the same; this just makes it explicit this.isMultipartCopyEnabled = isMultipartUploadEnabled; @@ -841,7 +846,7 @@ public void initialize(URI name, Configuration originalConf) if (this.analyticsAcceleratorEnabled) { LOG.info("Using S3SeekableInputStream"); - if(conf.getBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT)) { + if(this.analyticsAcceleratorCRTEnabled) { LOG.info("Using S3CrtClient"); this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java index 9362e9f322c59..7c6494d1bcf3e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java @@ -60,7 +60,14 @@ public boolean hasCapability(String capability) { @Override public int read() throws IOException { throwIfClosed(); - return inputStream.read(); + int bytesRead; + try { + bytesRead = inputStream.read(); + } catch (IOException ioe){ + onReadFailure(ioe); + throw ioe; + } + return bytesRead; } @Override @@ -89,18 +96,32 @@ public synchronized long getPos() { * * @param buf buffer to read data into * @param off start position in buffer at which data is written - * @param n the number of bytes to read; the n-th byte should be the last byte of the stream. + * @param len the number of bytes to read; the n-th byte should be the last byte of the stream. * @return the total number of bytes read into the buffer */ - public void readTail(byte[] buf, int off, int n) throws IOException { + public int readTail(byte[] buf, int off, int len) throws IOException { throwIfClosed(); - inputStream.readTail(buf, off, n); + int bytesRead; + try { + bytesRead = inputStream.readTail(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; } @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); - return inputStream.read(buf, off, len); + int bytesRead; + try { + bytesRead = inputStream.read(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; } @@ -118,12 +139,37 @@ public int available() throws IOException { @Override public void close() throws IOException { if (inputStream != null) { - inputStream.close(); - inputStream = null; - super.close(); + try { + inputStream.close(); + inputStream = null; + super.close(); + } catch (IOException ioe) { + LOG.debug("Failure closing stream {}: ", key); + throw ioe; + } } } + /** + * Close the stream on read failure. + * No attempt to recover from failure + * @param ioe exception caught. + */ + @Retries.OnceTranslated + private void onReadFailure(IOException ioe) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception while trying to read from stream {}, " + + "not trying to recover:", + key, ioe); + } else { + LOG.info("Got exception while trying to read from stream {}, " + + "not trying to recover:", + key, ioe); + } + this.close(); + } + + protected void throwIfClosed() throws IOException { if (isClosed()) { throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java index 343015c63e14e..080efc3c296a6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java @@ -30,7 +30,7 @@ import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY; -import static org.apache.hadoop.fs.s3a.Constants.USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR; +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; @@ -48,7 +48,7 @@ public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOExc Configuration conf = getConfiguration(); removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY); conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true); - conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient); + conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient); String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz"; S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf); @@ -83,7 +83,7 @@ public void testConnectorFrameworkConfigurable(boolean useCrtClient) { //Set Blobstore Capacity conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); - conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient); + conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient); ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java index 2561a69f60b59..a224798eb5d99 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java @@ -47,6 +47,7 @@ import org.apache.hadoop.security.UserGroupInformation; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -182,6 +183,7 @@ public void setup() throws Exception { // destroy all filesystems from previous runs. FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); super.setup(); + skipIfAnalyticsAcceleratorEnabled(createConfiguration()); jobId = randomJobId(); attempt0 = "attempt_" + jobId + "_m_000000_0"; taskAttempt0 = TaskAttemptID.forName(attempt0);