Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpointOverride S3 support #5087

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.extensions.s3.AwsCredentials;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
Expand Down Expand Up @@ -583,13 +583,13 @@ public void testArrayColumns() {
public void readSampleParquetFilesFromS3Test1() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.awsRegionName("us-east-1")
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(AwsCredentials.defaultCredentials())
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
Expand Down Expand Up @@ -621,7 +621,7 @@ public void readSampleParquetFilesFromS3Test1() {
public void readSampleParquetFilesFromS3Test2() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.awsRegionName("us-east-2")
.regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

interface AwsSdkV2Credentials extends AwsCredentials {
interface AwsSdkV2Credentials extends Credentials {

AwsCredentialsProvider awsCredentialsProvider();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;

/**
* AWS credentials provider that uses access key and secret key provided at construction.
* Basic credentials that uses access key id and secret access key provided at construction.
*/
@Immutable
@SimpleStyle
abstract class BasicCredentials implements AwsSdkV2Credentials {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

static BasicCredentials of(final String awsAccessKeyId, final String awsSecretAccessKey) {
return ImmutableBasicCredentials.of(awsAccessKeyId, awsSecretAccessKey);
static BasicCredentials of(final String accessKeyId, final String secretAccessKey) {
return ImmutableBasicCredentials.of(accessKeyId, secretAccessKey);
}

@Value.Parameter
abstract String awsAccessKeyId();
abstract String accessKeyId();

@Value.Redacted
@Value.Parameter
abstract String awsSecretAccessKey();
abstract String secretAccessKey();

public AwsCredentialsProvider awsCredentialsProvider() {
final AwsBasicCredentials awsCreds = AwsBasicCredentials.create(awsAccessKeyId(), awsSecretAccessKey());
final AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId(), secretAccessKey());
return StaticCredentialsProvider.create(awsCreds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.extensions.s3;

public interface Credentials {

/**
* The default credentials.
*
* @see <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html">Default
* credentials provider chain</a>
*/
static Credentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
}

/**
* Basic credentials with the specified access key id and secret access key.
*
* @param accessKeyId the access key id, used to identify the user
* @param secretAccessKey the secret access key, used to authenticate the user
*/
static Credentials basicCredentials(final String accessKeyId, final String secretAccessKey) {
return BasicCredentials.of(accessKeyId, secretAccessKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Optional;

/**
* This class provides instructions intended for reading and writing data to AWS S3 instances.
* This class provides instructions intended for reading and writing data to S3-compatible instances.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
*/
@Immutable
@BuildableStyle
Expand All @@ -39,9 +39,9 @@ public static Builder builder() {
}

/**
* The AWS region name to use when reading or writing to S3.
* The region name to use when reading or writing to S3.
*/
public abstract String awsRegionName();
public abstract String regionName();

/**
* The maximum number of concurrent requests to make to S3, defaults to {@value #DEFAULT_MAX_CONCURRENT_REQUESTS}.
Expand All @@ -62,7 +62,7 @@ public int readAheadCount() {
}

/**
* The byte size of each fragment to read from S3, defaults to the value of config parameter
* The maximum byte size of each fragment to read from S3, defaults to the value of config parameter
* {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}, or 5 MiB if unset. Must be between 8 KiB and the value of config
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
* parameter {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched
* fragment can be smaller.
Expand All @@ -73,13 +73,14 @@ public int fragmentSize() {
}

/**
* The maximum number of fragments to cache in memory, defaults to {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is
* done at the deephaven layer for faster access to recently read fragments. Must be greater than or equal to
* {@code 1 + readAheadCount()}.
* The maximum number of fragments to cache in memory, defaults to
* {@code Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE)}, which is at least
* {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is done at the deephaven layer for faster access to recently read
* fragments. Must be greater than or equal to {@code 1 + readAheadCount()}.
*/
@Default
public int maxCacheSize() {
return DEFAULT_MAX_CACHE_SIZE;
return Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE);
}

/**
Expand All @@ -100,31 +101,23 @@ public Duration readTimeout() {
}

/**
* The credentials to use when reading or writing to S3. By default, uses
* {@link AwsCredentials#defaultCredentials()}.
* The credentials to use when reading or writing to S3. By default, uses {@link Credentials#defaultCredentials()}.
*/
@Default
public AwsCredentials credentials() {
return AwsCredentials.defaultCredentials();
public Credentials credentials() {
return Credentials.defaultCredentials();
}

/**
* Configure the endpoint with which the SDK should communicate.
* The endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when
* connecting to non-AWS, S3-compatible APIs.
*
* @see <a href="https://docs.aws.amazon.com/general/latest/gr/s3.html">Amazon Simple Storage Service endpoints</a>
*/
public abstract Optional<URI> endpointOverride();

// If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style
// requests via software.amazon.awssdk.services.s3.S3BaseClientBuilder.forcePathStyle. Originally, AWS planned to
// deprecate path-style requests, but that has been delayed an indefinite amount of time. In the meantime, we'll
// keep S3Instructions simpler.
// https://aws.amazon.com/blogs/storage/update-to-amazon-s3-path-deprecation-plan/
// @Default
// public boolean forcePathStyle() {
// return false;
// }

public interface Builder {
Builder awsRegionName(String awsRegionName);
Builder regionName(String regionName);

Builder maxConcurrentRequests(int maxConcurrentRequests);

Expand All @@ -138,7 +131,7 @@ public interface Builder {

Builder readTimeout(Duration connectionTimeout);

Builder credentials(AwsCredentials credentials);
Builder credentials(Credentials credentials);

Builder endpointOverride(URI endpointOverride);

Expand Down Expand Up @@ -187,11 +180,21 @@ final void boundsCheckMaxCacheSize() {
final void awsSdkV2Credentials() {
if (!(credentials() instanceof AwsSdkV2Credentials)) {
throw new IllegalArgumentException(
"credentials() must be created via provided io.deephaven.extensions.s3.AwsCredentials methods");
"credentials() must be created via provided io.deephaven.extensions.s3.Credentials methods");
}
}

final AwsCredentialsProvider awsCredentialsProvider() {
return ((AwsSdkV2Credentials) credentials()).awsCredentialsProvider();
}

// If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style
// requests via software.amazon.awssdk.services.s3.S3BaseClientBuilder.forcePathStyle. Originally, AWS planned to
// deprecate path-style requests, but that has been delayed an indefinite amount of time. In the meantime, we'll
// keep S3Instructions simpler.
// https://aws.amazon.com/blogs/storage/update-to-amazon-s3-path-deprecation-plan/
// @Default
// public boolean forcePathStyle() {
// return false;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


/**
* {@link SeekableByteChannel} class used to fetch objects from AWS S3 buckets using an async client with the ability to
* {@link SeekableByteChannel} class used to fetch objects from S3 buckets using an async client with the ability to
* read ahead and cache fragments of the object.
*/
final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelProvider.ContextHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static io.deephaven.extensions.s3.S3Instructions.MAX_FRAGMENT_SIZE;

/**
* {@link SeekableChannelsProvider} implementation that is used to fetch objects from AWS S3 instances.
* {@link SeekableChannelsProvider} implementation that is used to fetch objects from S3 instances.
*/
final class S3SeekableChannelProvider implements SeekableChannelsProvider {

Expand All @@ -41,7 +41,7 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {
// TODO(deephaven-core#5062): Add support for async client recovery and auto-close
// TODO(deephaven-core#5063): Add support for caching clients for re-use
final S3AsyncClientBuilder builder = S3AsyncClient.builder()
.region(Region.of(s3Instructions.awsRegionName()))
.region(Region.of(s3Instructions.regionName()))
.httpClient(asyncHttpClient)
.credentialsProvider(s3Instructions.awsCredentialsProvider());
s3Instructions.endpointOverride().ifPresent(builder::endpointOverride);
Expand Down
Loading
Loading