-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19348. Add support for Analytics Accelerator Library for Amazon S3 (DRAFT) #7192
HADOOP-19348. Add support for Analytics Accelerator Library for Amazon S3 (DRAFT) #7192
Conversation
This commits is the initial integration of Analytics Accelerator Library for Amazon S3 to S3A. It performs integration by introducing a new S3ASeekableStream and modifying S3AFileSystem. Use of the Analytics Accelerator Library is controlled by a configration and it is off by default.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments
* Config to enable Analytics Accelerator Library for Amazon S3 | ||
* https://github.com/awslabs/analytics-accelerator-s3 | ||
*/ | ||
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = "fs.s3a.analytics.accelerator.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the prefix ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX here.
@Override | ||
public int read() throws IOException { | ||
throwIfClosed(); | ||
return inputStream.read(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to deal with all the Exception handling in read() and seek() calls as done in S3AInputStream.
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; | ||
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; | ||
|
||
public class ITestS3AS3SeekableStream extends AbstractS3ATestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about a contract test to use the new stream for all types of read.
ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); | ||
|
||
S3SeekableInputStreamConfiguration configuration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); | ||
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), PrefetchMode.ALL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use assertj
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1); | ||
|
||
ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); | ||
assertThrows(IllegalArgumentException.class, () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use assertJ with a message,
removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY); | ||
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true); | ||
|
||
String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a config fs.s3a.scale.test.csvfile for this file. use that.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
@Override | ||
public void setup() throws Exception { | ||
super.setup(); | ||
skipIfAnalyticsAcceleratorEnabled(getContract().getConf()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than skipping, can we not configure to run all these tests with S3ASeekableStream S3AInputStream ?That will lead to huge test coverage for the new read flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good, thank you!
I've created JIRAs for some follow up work:
https://issues.apache.org/jira/browse/HADOOP-19369 - Move input stream creation to S3AStore.
https://issues.apache.org/jira/browse/HADOOP-19368 - Move client creation to S3AStore.
My main comment is to add reasons for skippedTests, we have this in some places but not everywhere. We can either do that via comments or a "reason" field in the skip code.
<version>0.0.1</version> | ||
<scope>compile</scope> | ||
</dependency> | ||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we discussed the CRT should be provided scope? (not sure though)
LOG.info("Using S3SeekableInputStream"); | ||
if(this.analyticsAcceleratorCRTEnabled) { | ||
LOG.info("Using S3CrtClient"); | ||
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to move this S3AStoreImpl, and configure properly as a follow up. Ticket to track this work: https://issues.apache.org/jira/browse/HADOOP-19368
@@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, | |||
*/ | |||
private S3Client s3Client; | |||
|
|||
/** | |||
* CRT-Based S3Client created of analytics accelerator library is enabled | |||
* and managed by the ClientManager. Analytics accelerator library can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not true as of now, S3AFileSystem is managing the CRT as we are not creating it in the ClientManager/S3AStoreImpl.
DEFAULT_MULTIPART_UPLOAD_ENABLED); | ||
|
||
if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) { | ||
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removal of this is also tracked in: https://issues.apache.org/jira/browse/HADOOP-19368
@@ -1892,6 +1949,14 @@ private FSDataInputStream executeOpen( | |||
fileInformation.applyOptions(readContext); | |||
LOG.debug("Opening '{}'", readContext); | |||
|
|||
if (this.analyticsAcceleratorEnabled) { | |||
return new FSDataInputStream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
input stream creation to be moved to S3AStore, work tracked here: https://issues.apache.org/jira/browse/HADOOP-19369
LOG.info("Using S3CrtClient"); | ||
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); | ||
} else { | ||
LOG.info("Using S3Client"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, "Using S3 async client for of analytics accelerator S3"
We will eventually either get rid of these/move them to debug or have them at log only once. Can decide when we update client creation code.
} | ||
|
||
@Override | ||
public void close() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at existing implementations, they maintain a volatile variable
private volatile boolean closed;
and then update it on the in a synchronised block on the close and check it on throwIfClosed(). Should we do the same?
@@ -88,6 +87,12 @@ protected Configuration createConfiguration() { | |||
return conf; | |||
} | |||
|
|||
@Override | |||
public void testOverwriteExistingFile() throws Throwable { | |||
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We intend to remove this right? Will work with you offline to create a list of tests we're skipping right now, but won't skip before trunk merge and create a JIRA for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great if we could add some temporary comments wherever we skip. Explain the reason for skipping and if this skip will be removed or not in the future.
* Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. | ||
* @param configuration configuration to probe | ||
*/ | ||
public static void skipIfAnalyticsAcceleratorEnabled( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually you could add a reason field here, and then wherever you skip, pass in the reason.
@@ -203,6 +204,9 @@ private void abortActiveStream() throws IOException { | |||
@Test | |||
public void testCostOfCreatingMagicFile() throws Throwable { | |||
describe("Files created under magic paths skip existence checks and marker deletes"); | |||
|
|||
// Assertions will fail as {@link S3ASeekableInputStream} do not support InputStreamStatistics yes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo yet*
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Merging to feature branch.
3d8f4a4
into
apache:feature-HADOOP-19363-analytics-accelerator-s3
Description of PR
Integrate Analytics Accelerator Library for Amazon S3
This PR is the initial integration of Analytics Accelerator
Library for Amazon S3 to S3A. It performs integration by introducing a new
S3ASeekableStream and modifying S3AFileSystem. Use of the Analytics
Accelerator Library is controlled by a configuration and it is off by default.
How was this patch tested?
Added new integration tests ITestS3AS3SeekableStream and running all hadoop-aws tests with
in us-east-1 by running
mvn -Dparallel-tests -DtestsThreadCount=16 clean verify.
Following are current failures:
[org.apache.hadoop.fs.s3a.commit.terasort] 21 errors: failing on EC2 instance, passing when executed from developer machine.
[ITestS3ACannedACLs] 1 error: My personal bucket does not allow object acls. Should pass in CICD
[ITestS3ARequesterPays] 1 error: CRT client does not handle requestPays. Will track with relevant team.
[ITestS3ATemporaryCredentials] 1 error: My personal instance is set with temporary creds already. Should pass in CICD
[[ITestS3AContractSeek]: 2 failures. Found a bug in Analytics Accelerator Library for Amazon S3. We are working on fixing it. https://github.com/awslabs/analytics-accelerator-s3/issues/201
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?