-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add S3 async upload utilities and models
Signed-off-by: Raghuvansh Raj <[email protected]>
- Loading branch information
1 parent
9ceae51
commit 4e5d98a
Showing
11 changed files
with
1,408 additions
and
1 deletion.
There are no files selected for viewing
37 changes: 37 additions & 0 deletions
37
...ns/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3Reference.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.repositories.s3; | ||
|
||
import org.opensearch.common.concurrent.RefCountedReleasable; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
|
||
/** | ||
* Handles the shutdown of the wrapped {@link software.amazon.awssdk.services.s3.S3AsyncClient} using reference | ||
* counting. | ||
*/ | ||
public class AmazonAsyncS3Reference extends RefCountedReleasable<AmazonAsyncS3WithCredentials> { | ||
|
||
AmazonAsyncS3Reference(AmazonAsyncS3WithCredentials client) { | ||
super("AWS_S3_CLIENT", client, () -> { | ||
client.client().close(); | ||
client.priorityClient().close(); | ||
AwsCredentialsProvider credentials = client.credentials(); | ||
if (credentials instanceof Closeable) { | ||
try { | ||
((Closeable) credentials).close(); | ||
} catch (IOException e) { | ||
/* Do nothing here */ | ||
} | ||
} | ||
}); | ||
} | ||
} |
52 changes: 52 additions & 0 deletions
52
...ository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.repositories.s3; | ||
|
||
import org.opensearch.common.Nullable; | ||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | ||
import software.amazon.awssdk.services.s3.S3AsyncClient; | ||
|
||
/** | ||
* The holder of the AmazonS3 and AWSCredentialsProvider | ||
*/ | ||
final class AmazonAsyncS3WithCredentials { | ||
private final S3AsyncClient client; | ||
private final S3AsyncClient priorityClient; | ||
private final AwsCredentialsProvider credentials; | ||
|
||
private AmazonAsyncS3WithCredentials( | ||
final S3AsyncClient client, | ||
final S3AsyncClient priorityClient, | ||
@Nullable final AwsCredentialsProvider credentials | ||
) { | ||
this.client = client; | ||
this.credentials = credentials; | ||
this.priorityClient = priorityClient; | ||
} | ||
|
||
S3AsyncClient client() { | ||
return client; | ||
} | ||
|
||
S3AsyncClient priorityClient() { | ||
return priorityClient; | ||
} | ||
|
||
AwsCredentialsProvider credentials() { | ||
return credentials; | ||
} | ||
|
||
static AmazonAsyncS3WithCredentials create( | ||
final S3AsyncClient client, | ||
final S3AsyncClient priorityClient, | ||
@Nullable final AwsCredentialsProvider credentials | ||
) { | ||
return new AmazonAsyncS3WithCredentials(client, priorityClient, credentials); | ||
} | ||
} |
476 changes: 476 additions & 0 deletions
476
plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
...epository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncExecutorBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.repositories.s3.async; | ||
|
||
import java.util.concurrent.ExecutorService; | ||
|
||
/** | ||
* An encapsulation for the {@link TransferNIOGroup}, and the stream reader and future completion executor services | ||
*/ | ||
public class AsyncExecutorBuilder { | ||
|
||
private final ExecutorService futureCompletionExecutor; | ||
private final ExecutorService streamReader; | ||
private final TransferNIOGroup transferNIOGroup; | ||
|
||
/** | ||
* Construct a new AsyncExecutorBuilder object | ||
* | ||
* @param futureCompletionExecutor An {@link ExecutorService} to pass to {@link software.amazon.awssdk.services.s3.S3AsyncClient} for future completion | ||
* @param streamReader An {@link ExecutorService} to read streams for upload | ||
* @param transferNIOGroup A {@link TransferNIOGroup} which encapsulates the netty {@link io.netty.channel.EventLoopGroup} for async uploads | ||
*/ | ||
public AsyncExecutorBuilder(ExecutorService futureCompletionExecutor, ExecutorService streamReader, TransferNIOGroup transferNIOGroup) { | ||
this.transferNIOGroup = transferNIOGroup; | ||
this.streamReader = streamReader; | ||
this.futureCompletionExecutor = futureCompletionExecutor; | ||
} | ||
|
||
public ExecutorService getFutureCompletionExecutor() { | ||
return futureCompletionExecutor; | ||
} | ||
|
||
public TransferNIOGroup getTransferNIOGroup() { | ||
return transferNIOGroup; | ||
} | ||
|
||
public ExecutorService getStreamReader() { | ||
return streamReader; | ||
} | ||
} |
Oops, something went wrong.