Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpointOverride S3 support #5087

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.file.TrackedFileHandleFactory;
import io.deephaven.extensions.s3.AwsCredentials;
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.parquet.base.NullStatistics;
import io.deephaven.parquet.base.InvalidParquetFileException;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
Expand Down Expand Up @@ -583,13 +583,13 @@ public void testArrayColumns() {
public void readSampleParquetFilesFromS3Test1() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.awsRegionName("us-east-1")
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(AwsCredentials.defaultCredentials())
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
Expand Down Expand Up @@ -621,7 +621,7 @@ public void readSampleParquetFilesFromS3Test1() {
public void readSampleParquetFilesFromS3Test2() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.awsRegionName("us-east-2")
.regionName("us-east-2")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
Expand Down
27 changes: 27 additions & 0 deletions extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,31 @@ dependencies {

Classpaths.inheritAutoService(project)
Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

testImplementation "org.testcontainers:testcontainers:1.19.4"
testImplementation "org.testcontainers:junit-jupiter:1.19.4"
testImplementation "org.testcontainers:localstack:1.19.4"
testImplementation "org.testcontainers:minio:1.19.4"

testRuntimeOnly project(':test-configs')
testRuntimeOnly project(':log-to-slf4j')
Classpaths.inheritSlf4j(project, 'slf4j-simple', 'testRuntimeOnly')
}

test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

interface AwsSdkV2Credentials extends AwsCredentials {
interface AwsSdkV2Credentials extends Credentials {

AwsCredentialsProvider awsCredentialsProvider();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;

/**
* AWS credentials provider that uses access key and secret key provided at construction.
* Basic credentials that uses access key id and secret access key provided at construction.
*/
@Immutable
@SimpleStyle
abstract class BasicCredentials implements AwsSdkV2Credentials {
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved

static BasicCredentials of(final String awsAccessKeyId, final String awsSecretAccessKey) {
return ImmutableBasicCredentials.of(awsAccessKeyId, awsSecretAccessKey);
static BasicCredentials of(final String accessKeyId, final String secretAccessKey) {
return ImmutableBasicCredentials.of(accessKeyId, secretAccessKey);
}

@Value.Parameter
abstract String awsAccessKeyId();
abstract String accessKeyId();

@Value.Redacted
@Value.Parameter
abstract String awsSecretAccessKey();
abstract String secretAccessKey();

public AwsCredentialsProvider awsCredentialsProvider() {
final AwsBasicCredentials awsCreds = AwsBasicCredentials.create(awsAccessKeyId(), awsSecretAccessKey());
final AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId(), secretAccessKey());
return StaticCredentialsProvider.create(awsCreds);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.extensions.s3;

public interface Credentials {

/**
* The default credentials.
*
* @see <a href="https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html">Default
* credentials provider chain</a>
*/
static Credentials defaultCredentials() {
return DefaultCredentials.DEFAULT_CREDENTIALS;
}

/**
* Basic credentials with the specified access key id and secret access key.
*
* @param accessKeyId the access key id, used to identify the user
* @param secretAccessKey the secret access key, used to authenticate the user
*/
static Credentials basicCredentials(final String accessKeyId, final String secretAccessKey) {
return BasicCredentials.of(accessKeyId, secretAccessKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@

import io.deephaven.annotations.BuildableStyle;
import io.deephaven.configuration.Configuration;
import org.immutables.value.Value;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

import java.net.URI;
import java.time.Duration;
import java.util.Optional;

/**
* This class provides instructions intended for reading and writing data to AWS S3 instances.
* This class provides instructions intended for reading and writing data to S3-compatible instances.
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
*/
@Value.Immutable
@Immutable
@BuildableStyle
public abstract class S3Instructions {

Expand All @@ -22,10 +26,10 @@ public abstract class S3Instructions {

private final static String MAX_FRAGMENT_SIZE_CONFIG_PARAM = "S3.maxFragmentSize";
final static int MAX_FRAGMENT_SIZE =
Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MB
Configuration.getInstance().getIntegerWithDefault(MAX_FRAGMENT_SIZE_CONFIG_PARAM, 5 << 20); // 5 MiB
private final static int DEFAULT_FRAGMENT_SIZE = MAX_FRAGMENT_SIZE;

private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB
private final static int DEFAULT_MAX_CACHE_SIZE = 32;
private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);
Expand All @@ -35,14 +39,14 @@ public static Builder builder() {
}

/**
* The AWS region name to use when reading or writing to S3.
* The region name to use when reading or writing to S3.
*/
public abstract String awsRegionName();
public abstract String regionName();

/**
* The maximum number of concurrent requests to make to S3, defaults to {@value #DEFAULT_MAX_CONCURRENT_REQUESTS}.
*/
@Value.Default
@Default
public int maxConcurrentRequests() {
return DEFAULT_MAX_CONCURRENT_REQUESTS;
}
Expand All @@ -52,70 +56,107 @@ public int maxConcurrentRequests() {
* {@value #DEFAULT_READ_AHEAD_COUNT}, which means by default, we will fetch {@value #DEFAULT_READ_AHEAD_COUNT}
* fragments in advance when reading current fragment.
*/
@Value.Default
@Default
public int readAheadCount() {
return DEFAULT_READ_AHEAD_COUNT;
}

/**
* The maximum size of each fragment to read from S3, defaults to the value of config parameter
* {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched fragment can
* be smaller.
* The maximum byte size of each fragment to read from S3, defaults to the value of config parameter
* {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}, or 5 MiB if unset. Must be between 8 KiB and the value of config
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
* parameter {@value MAX_FRAGMENT_SIZE_CONFIG_PARAM}. If there are fewer bytes remaining in the file, the fetched
* fragment can be smaller.
*/
@Value.Default
@Default
public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

/**
* The maximum number of fragments to cache in memory, defaults to {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is
* done at the deephaven layer for faster access to recently read fragments.
* The maximum number of fragments to cache in memory, defaults to
* {@code Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE)}, which is at least
* {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is done at the deephaven layer for faster access to recently read
* fragments. Must be greater than or equal to {@code 1 + readAheadCount()}.
*/
@Value.Default
@Default
public int maxCacheSize() {
return DEFAULT_MAX_CACHE_SIZE;
return Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE);
}

/**
* The amount of time to wait when initially establishing a connection before giving up and timing out, defaults to
* 2 seconds.
*/
@Value.Default
@Default
public Duration connectionTimeout() {
return DEFAULT_CONNECTION_TIMEOUT;
}

/**
* The amount of time to wait when reading a fragment before giving up and timing out, defaults to 2 seconds
*/
@Value.Default
@Default
public Duration readTimeout() {
return DEFAULT_READ_TIMEOUT;
}

/**
* The credentials to use when reading or writing to S3. By default, uses {@link DefaultCredentialsProvider}.
* The credentials to use when reading or writing to S3. By default, uses {@link Credentials#defaultCredentials()}.
*/
@Value.Default
public AwsCredentials credentials() {
return AwsCredentials.defaultCredentials();
@Default
public Credentials credentials() {
return Credentials.defaultCredentials();
}

@Value.Check
/**
* The endpoint to connect to. Callers connecting to AWS do not typically need to set this; it is most useful when
* connecting to non-AWS, S3-compatible APIs.
*
* @see <a href="https://docs.aws.amazon.com/general/latest/gr/s3.html">Amazon Simple Storage Service endpoints</a>
*/
public abstract Optional<URI> endpointOverride();

public interface Builder {
Builder regionName(String regionName);

Builder maxConcurrentRequests(int maxConcurrentRequests);

Builder readAheadCount(int readAheadCount);

Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Builder connectionTimeout(Duration connectionTimeout);

Builder readTimeout(Duration connectionTimeout);

Builder credentials(Credentials credentials);

Builder endpointOverride(URI endpointOverride);

default Builder endpointOverride(String endpointOverride) {
return endpointOverride(URI.create(endpointOverride));
}

S3Instructions build();
}

@Check
final void boundsCheckMaxConcurrentRequests() {
if (maxConcurrentRequests() < 1) {
throw new IllegalArgumentException("maxConcurrentRequests(=" + maxConcurrentRequests() + ") must be >= 1");
}
}

@Value.Check
@Check
final void boundsCheckReadAheadCount() {
if (readAheadCount() < 0) {
throw new IllegalArgumentException("readAheadCount(=" + readAheadCount() + ") must be >= 0");
}
}

@Value.Check
@Check
final void boundsCheckMaxFragmentSize() {
if (fragmentSize() < MIN_FRAGMENT_SIZE) {
throw new IllegalArgumentException("fragmentSize(=" + fragmentSize() + ") must be >= " + MIN_FRAGMENT_SIZE +
Expand All @@ -127,31 +168,33 @@ final void boundsCheckMaxFragmentSize() {
}
}

@Value.Check
@Check
final void boundsCheckMaxCacheSize() {
if (maxCacheSize() < readAheadCount() + 1) {
throw new IllegalArgumentException("maxCacheSize(=" + maxCacheSize() + ") must be >= 1 + " +
"readAheadCount(=" + readAheadCount() + ")");
}
}

public interface Builder {
Builder awsRegionName(String awsRegionName);

Builder maxConcurrentRequests(int maxConcurrentRequests);

Builder readAheadCount(int readAheadCount);

Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Builder connectionTimeout(Duration connectionTimeout);

Builder readTimeout(Duration connectionTimeout);

Builder credentials(AwsCredentials credentials);
@Check
final void awsSdkV2Credentials() {
if (!(credentials() instanceof AwsSdkV2Credentials)) {
throw new IllegalArgumentException(
"credentials() must be created via provided io.deephaven.extensions.s3.Credentials methods");
}
}

S3Instructions build();
final AwsCredentialsProvider awsCredentialsProvider() {
return ((AwsSdkV2Credentials) credentials()).awsCredentialsProvider();
}

// If necessary, we _could_ plumb support for "S3-compatible" services which don't support virtual-host style
// requests via software.amazon.awssdk.services.s3.S3BaseClientBuilder.forcePathStyle. Originally, AWS planned to
// deprecate path-style requests, but that has been delayed an indefinite amount of time. In the meantime, we'll
// keep S3Instructions simpler.
// https://aws.amazon.com/blogs/storage/update-to-amazon-s3-path-deprecation-plan/
// @Default
// public boolean forcePathStyle() {
// return false;
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


/**
* {@link SeekableByteChannel} class used to fetch objects from AWS S3 buckets using an async client with the ability to
* {@link SeekableByteChannel} class used to fetch objects from S3 buckets using an async client with the ability to
* read ahead and cache fragments of the object.
*/
final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelProvider.ContextHolder {
Expand Down
Loading
Loading