Skip to content

Commit

Permalink
Merge pull request #1 from rajdchak/feature/analytics-accelerator-client
Browse files Browse the repository at this point in the history
Make crt configurable
  • Loading branch information
fuatbasik authored Dec 13, 2024
2 parents 22d4e8d + 6a1a660 commit 208d6cc
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1767,12 +1767,23 @@ private Constants() {
*/
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = "fs.s3a.analytics.accelerator.enabled";

/**
* Config to specify usage of crt client with Analytics Accelerator Library for Amazon S3 and it is by default true
*/
public static final String USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR = "fs.s3a.analytics.accelerator.crt.client";

/**
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
* Value {@value}.
*/
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;

/**
* Default value for {@link #USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR }
* Value {@value}.
*/
public static final boolean USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT = true;

/**
* Prefix to configure Analytics Accelerator Library
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* and managed by the ClientManager. Analytics accelerator library can be
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
*/
private S3AsyncClient crtClient;
private S3AsyncClient s3AsyncClient;

// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
Expand Down Expand Up @@ -694,7 +694,6 @@ public void initialize(URI name, Configuration originalConf)
s3ExpressStore);

this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
long prefetchBlockSizeLong =
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) {
Expand All @@ -703,8 +702,15 @@ public void initialize(URI name, Configuration originalConf)
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);

this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);

if(!analyticsAcceleratorEnabled) {
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
} else {
this.isMultipartUploadEnabled = false;
}
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

Expand Down Expand Up @@ -741,17 +747,6 @@ public void initialize(URI name, Configuration originalConf)
// the encryption algorithms)
ClientManager clientManager = createClientManager(name, delegationTokensEnabled);

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
this.crtClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
ConnectorConfiguration configuration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.crtClient), seekableInputStreamConfiguration);
}

inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
Expand Down Expand Up @@ -843,6 +838,25 @@ public void initialize(URI name, Configuration originalConf)
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();

if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(conf.getBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT)) {
LOG.info("Using S3CrtClient");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3Client");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

ConnectorConfiguration configuration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient), seekableInputStreamConfiguration);
}

// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
Expand Down Expand Up @@ -4468,7 +4482,7 @@ protected synchronized void stopAllServices() {
closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
store = null;
s3Client = null;
crtClient = null;
s3AsyncClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
Expand All @@ -41,13 +42,13 @@ public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
final String PHYSICAL_IO_PREFIX = "physicalio";
final String LOGICAL_IO_PREFIX = "logicalio";

@Test
public void testConnectorFrameWorkIntegration() throws IOException {
public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOException {
describe("Verify S3 connector framework integration");

Configuration conf = getConfiguration();
removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);

String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf);
Expand All @@ -60,9 +61,17 @@ public void testConnectorFrameWorkIntegration() throws IOException {

}

@Test
public void testConnectorFrameWorkIntegrationWithCrtClient() throws IOException {
testConnectorFrameWorkIntegration(true);
}

@Test
public void testConnectorFrameworkConfigurable() {
public void testConnectorFrameWorkIntegrationWithoutCrtClient() throws IOException {
testConnectorFrameWorkIntegration(false);
}

public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
describe("Verify S3 connector framework reads configuration");

Configuration conf = getConfiguration();
Expand All @@ -74,13 +83,25 @@ public void testConnectorFrameworkConfigurable() {
//Set Blobstore Capacity
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);

conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);

ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);

S3SeekableInputStreamConfiguration configuration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), PrefetchMode.ALL);
assert configuration.getPhysicalIOConfiguration().getBlobStoreCapacity() == 1;
}

@Test
public void testConnectorFrameworkConfigurableWithoutCrtClient() throws IOException {
testConnectorFrameworkConfigurable(false);
}

@Test
public void testConnectorFrameworkConfigurableWithCrtClient() throws IOException {
testConnectorFrameworkConfigurable(true);
}

@Test
public void testInvalidConfigurationThrows() {
describe("Verify S3 connector framework throws with invalid configuration");
Expand Down

0 comments on commit 208d6cc

Please sign in to comment.