Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpointOverride S3 support #5087

Merged
27 changes: 27 additions & 0 deletions extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,31 @@ dependencies {

Classpaths.inheritAutoService(project)
Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:junit-jupiter:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
testImplementation "org.testcontainers:minio:1.19.4"

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}

test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,29 @@

public interface AwsCredentials {

/**
* AWS credentials that looks for credentials in this order:
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* <ol>
* <li>Java System Properties - {@code aws.accessKeyId} and {@code aws.secretAccessKey}</li>
* <li>Environment Variables - {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY}</li>
* <li>Web Identity Token credentials from system properties or environment variables</li>
* <li>Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS
* CLI</li>
* <li>Credentials delivered through the Amazon EC2 container service if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
* environment variable is set and security manager has permission to access the variable,</li>
* <li>Instance profile credentials delivered through the Amazon EC2 metadata service</li>
* </ol>
*/
static AwsCredentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
}

/**
* AWS credentials with the specified AWS access key and AWS secret key.
*
* @param awsAccessKeyId The AWS access key, used to identify the user interacting with AWS.
* @param awsSecretAccessKey The AWS secret access key, used to authenticate the user interacting with AWS.
*/
static AwsCredentials basicCredentials(final String awsAccessKeyId, final String awsSecretAccessKey) {
return BasicCredentials.of(awsAccessKeyId, awsSecretAccessKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.configuration.Configuration;
import org.immutables.value.Value;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.net.URI;
import java.time.Duration;
import java.util.Optional;

/**
* This class provides instructions intended for reading and writing data to AWS S3 instances.
*/
@Value.Immutable
@Immutable
@BuildableStyle
public abstract class S3Instructions {

Expand All @@ -22,10 +26,10 @@ public abstract class S3Instructions {

private final static String MAX_FRAGMENT_SIZE_CONFIG_PARAM = "S3.maxFragmentSize";
final static int MAX_FRAGMENT_SIZE =
Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MB
Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MiB
private final static int DEFAULT_FRAGMENT_SIZE = MAX_FRAGMENT_SIZE;

private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB
private final static int DEFAULT_MAX_CACHE_SIZE = 32;
private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);
Expand All @@ -42,7 +46,7 @@ public static Builder builder() {
/**
* The maximum number of concurrent requests to make to S3, defaults to {@value #DEFAULT_MAX_CONCURRENT_REQUESTS}.
*/
@Value.Default
@Default
public int maxConcurrentRequests() {
return DEFAULT_MAX_CONCURRENT_REQUESTS;
}
Expand All @@ -52,26 +56,28 @@ public int maxConcurrentRequests() {
* {@value #DEFAULT_READ_AHEAD_COUNT}, which means by default, we will fetch {@value #DEFAULT_READ_AHEAD_COUNT}
* fragments in advance when reading current fragment.
*/
@Value.Default
@Default
public int readAheadCount() {
return DEFAULT_READ_AHEAD_COUNT;
}

/**
* The maximum size of each fragment to read from S3, defaults to the value of config parameter
* {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched fragment can
* be smaller.
* The byte size of each fragment to read from S3, defaults to the value of config parameter
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
* {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}, or 5 MiB if unset. Must be between 8 KiB and the value of config
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
* parameter {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched
* fragment can be smaller.
*/
@Value.Default
@Default
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

/**
* The maximum number of fragments to cache in memory, defaults to {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is
* done at the deephaven layer for faster access to recently read fragments.
* done at the deephaven layer for faster access to recently read fragments. Must be greater than or equal to
* {@code 1 + readAheadCount()}.
*/
@Value.Default
@Default
public int maxCacheSize() {
return DEFAULT_MAX_CACHE_SIZE;
}
Expand All @@ -80,42 +86,84 @@ public int maxCacheSize() {
* The amount of time to wait when initially establishing a connection before giving up and timing out, defaults to
* 2 seconds.
*/
@Value.Default
@Default
public Duration connectionTimeout() {
return DEFAULT_CONNECTION_TIMEOUT;
}

/**
* The amount of time to wait when reading a fragment before giving up and timing out, defaults to 2 seconds
*/
@Value.Default
@Default
public Duration readTimeout() {
return DEFAULT_READ_TIMEOUT;
}

/**
* The credentials to use when reading or writing to S3. By default, uses {@link DefaultCredentialsProvider}.
* The credentials to use when reading or writing to S3. By default, uses
* {@link AwsCredentials#defaultCredentials()}.
*/
@Value.Default
@Default
public AwsCredentials credentials() {
return AwsCredentials.defaultCredentials();
}

@Value.Check
/**
* Configure the endpoint with which the SDK should communicate.
*/
public abstract Optional<URI> endpointOverride();

// If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style
// requests via software.amazon.awssdk.services.s3.S3BaseClientBuilder.forcePathStyle. Originally, AWS planned to
// deprecate path-style requests, but that has been delayed an indefinite amount of time. In the meantime, we'll
// keep S3Instructions simpler.
// https://aws.amazon.com/blogs/storage/update-to-amazon-s3-path-deprecation-plan/
// @Default
// public boolean forcePathStyle() {
// return false;
// }

public interface Builder {
Builder awsRegionName(String awsRegionName);

Builder maxConcurrentRequests(int maxConcurrentRequests);

Builder readAheadCount(int readAheadCount);

Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Builder connectionTimeout(Duration connectionTimeout);

Builder readTimeout(Duration connectionTimeout);

Builder credentials(AwsCredentials credentials);

Builder endpointOverride(URI endpointOverride);

default Builder endpointOverride(String endpointOverride) {
return endpointOverride(URI.create(endpointOverride));
}

S3Instructions build();
}

@Check
final void boundsCheckMaxConcurrentRequests() {
if (maxConcurrentRequests() < 1) {
throw new IllegalArgumentException("maxConcurrentRequests(=" + maxConcurrentRequests() + ") must be >= 1");
}
}

@Value.Check
@Check
final void boundsCheckReadAheadCount() {
if (readAheadCount() < 0) {
throw new IllegalArgumentException("readAheadCount(=" + readAheadCount() + ") must be >= 0");
}
}

@Value.Check
@Check
final void boundsCheckMaxFragmentSize() {
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= " + MIN_FRAGMENT_SIZE +
Expand All @@ -127,31 +175,23 @@ final void boundsCheckMaxFragmentSize() {
}
}

@Value.Check
@Check
final void boundsCheckMaxCacheSize() {
if (maxCacheSize() < readAheadCount() + 1) {
throw new IllegalArgumentException("maxCacheSize(=" + maxCacheSize() + ") must be >= 1 + " +
"readAheadCount(=" + readAheadCount() + ")");
}
}

public interface Builder {
Builder awsRegionName(String awsRegionName);

Builder maxConcurrentRequests(int maxConcurrentRequests);

Builder readAheadCount(int readAheadCount);

Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Builder connectionTimeout(Duration connectionTimeout);

Builder readTimeout(Duration connectionTimeout);

Builder credentials(AwsCredentials credentials);
@Check
final void awsSdkV2Credentials() {
if (!(credentials() instanceof AwsSdkV2Credentials)) {
throw new IllegalArgumentException(
"credentials() must be created via provided io.deephaven.extensions.s3.AwsCredentials methods");
}
}

S3Instructions build();
final AwsCredentialsProvider awsCredentialsProvider() {
return ((AwsSdkV2Credentials) credentials()).awsCredentialsProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
*/
package io.deephaven.extensions.s3;

import io.deephaven.base.verify.Assert;
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;

import java.net.URI;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -40,12 +40,12 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {
.build();
// TODO(deephaven-core#5062): Add support for async client recovery and auto-close
// TODO(deephaven-core#5063): Add support for caching clients for re-use
Assert.instanceOf(s3Instructions.credentials(), "credentials", AwsSdkV2Credentials.class);
this.s3AsyncClient = S3AsyncClient.builder()
final S3AsyncClientBuilder builder = S3AsyncClient.builder()
.region(Region.of(s3Instructions.awsRegionName()))
.httpClient(asyncHttpClient)
.credentialsProvider(((AwsSdkV2Credentials) s3Instructions.credentials()).awsCredentialsProvider())
.build();
.credentialsProvider(s3Instructions.awsCredentialsProvider());
s3Instructions.endpointOverride().ifPresent(builder::endpointOverride);
this.s3AsyncClient = builder.build();
this.s3Instructions = s3Instructions;
}

Expand Down
Loading
Loading