Skip to content

Commit

Permalink
Add better testing
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Nov 26, 2023
1 parent abc90a2 commit 640e8d9
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 59 deletions.
44 changes: 38 additions & 6 deletions boto3/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import threading

import botocore.exceptions
from botocore.session import Session
from s3transfer.crt import (
BotocoreCRTCredentialsWrapper,
Expand All @@ -34,7 +35,8 @@
_CRT_S3_CLIENT = None
_BOTOCORE_CRT_SERIALIZER = None

_CLIENT_CREATION_LOCK = threading.Lock()
CLIENT_CREATION_LOCK = threading.Lock()
PROCESS_LOCK_NAME = 'boto3'


def _create_crt_client(session, config, region_name, cred_provider):
Expand All @@ -59,7 +61,7 @@ def _create_crt_request_serializer(session, region_name):

def _create_crt_s3_client(session, config, region_name, credentials, **kwargs):
"""Create boto3 wrapper class to manage crt lock reference and S3 client."""
lock = acquire_crt_s3_process_lock('boto3')
lock = acquire_crt_s3_process_lock(PROCESS_LOCK_NAME)
if lock is None:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
Expand All @@ -72,6 +74,7 @@ def _create_crt_s3_client(session, config, region_name, credentials, **kwargs):
_create_crt_client(session, config, region_name, cred_provider),
lock,
region_name,
cred_wrapper,
)


Expand All @@ -91,7 +94,7 @@ def get_crt_s3_client(client, config):
global _CRT_S3_CLIENT
global _BOTOCORE_CRT_SERIALIZER

with _CLIENT_CREATION_LOCK:
with CLIENT_CREATION_LOCK:
if _CRT_S3_CLIENT is None:
serializer, s3_client = _initialize_crt_transfer_primatives(
client, config
Expand All @@ -112,17 +115,46 @@ class CRTS3Client:
ensure we don't use the CRT client when a successful request cannot be made.
"""

def __init__(self, crt_client, process_lock, region):
def __init__(self, crt_client, process_lock, region, cred_provider):
self.crt_client = crt_client
self.process_lock = process_lock
self.region = region
self.cred_provider = cred_provider


def is_crt_compatible_request(client, crt_s3_client):
"""
Boto3 client must use same signing region and credentials
as the _CRT_S3_CLIENT singleton. Otherwise fallback to classic.
"""
if crt_s3_client is None:
return False

is_same_region = client.meta.region_name == crt_s3_client.region
is_same_identity = compare_identity(
client._get_credentials(), crt_s3_client.cred_provider
)
return is_same_region and is_same_identity


def compare_identity(boto3_creds, crt_s3_creds):
try:
crt_creds = crt_s3_creds()
except botocore.exceptions.NoCredentialsError:
return False

is_matching_identity = (
boto3_creds.access_key == crt_creds.access_key_id
and boto3_creds.secret_key == crt_creds.secret_access_key
and boto3_creds.token == crt_creds.session_token
)
return is_matching_identity


def create_crt_transfer_manager(client, config):
"""Create a CRTTransferManager for optimized data transfer."""
crt_s3_client = get_crt_s3_client(client, config)
called_region = client.meta.region_name
if crt_s3_client is not None and crt_s3_client.region == called_region:
if is_crt_compatible_request(client, crt_s3_client):
return CRTTransferManager(
crt_s3_client.crt_client, _BOTOCORE_CRT_SERIALIZER
)
Expand Down
17 changes: 17 additions & 0 deletions boto3/s3/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.


# TransferConfig preferred_transfer_client settings
CLASSIC_TRANSFER_CLIENT = "classic"
AUTO_RESOLVE_TRANSFER_CLIENT = "auto"
8 changes: 7 additions & 1 deletion boto3/s3/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import copy as python_copy

from botocore.exceptions import ClientError

from boto3 import utils
Expand Down Expand Up @@ -432,7 +434,11 @@ def copy(
if config is None:
config = TransferConfig()

with create_transfer_manager(self, config) as manager:
# copy is not supported in the CRT
new_config = python_copy.copy(config)
new_config.preferred_transfer_client = "classic"

with create_transfer_manager(self, new_config) as manager:
future = manager.copy(
copy_source=CopySource,
bucket=Bucket,
Expand Down
43 changes: 42 additions & 1 deletion boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ def __call__(self, bytes_amount):
from s3transfer.subscribers import BaseSubscriber
from s3transfer.utils import OSUtils

import boto3.s3.constants as constants
from boto3.exceptions import RetriesExceededError, S3UploadFailedError

if HAS_CRT:
import awscrt.s3

from boto3.crt import create_crt_transfer_manager

KB = 1024
Expand All @@ -163,7 +166,7 @@ def create_transfer_manager(client, config, osutil=None):
:rtype: s3transfer.manager.TransferManager
:returns: A transfer manager based on parameters provided
"""
if HAS_CRT:
if _should_use_crt(config):
crt_transfer_manager = create_crt_transfer_manager(client, config)
if crt_transfer_manager is not None:
logger.debug(
Expand All @@ -178,6 +181,33 @@ def create_transfer_manager(client, config, osutil=None):
return _create_default_transfer_manager(client, config, osutil)


def _should_use_crt(config):
if HAS_CRT:
is_optimized_instance = awscrt.s3.is_optimized_for_system()
else:
is_optimized_instance = False
pref_transfer_client = config.preferred_transfer_client.lower()

if (
is_optimized_instance
and pref_transfer_client == constants.AUTO_RESOLVE_TRANSFER_CLIENT
):
logger.debug(
"Attempting to use CRTTransferManager. Config settings may be ignored."
)
return True

logger.debug(
"Opting out of CRT Transfer Manager. Preferred client: "
"{pref_transfer_client}, CRT available: {HAS_CRT}, "
"Instance Optimized: {is_optimized_instance}.",
pref_transfer_client,
HAS_CRT,
is_optimized_instance,
)
return False


def _create_default_transfer_manager(client, config, osutil):
"""Create the default TransferManager implementation for s3transfer."""
executor_cls = None
Expand All @@ -202,6 +232,7 @@ def __init__(
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=None,
preferred_transfer_client="auto",
):
"""Configuration object for managed S3 transfers
Expand Down Expand Up @@ -242,6 +273,15 @@ def __init__(
:param max_bandwidth: The maximum bandwidth that will be consumed
in uploading and downloading file content. The value is an integer
in terms of bytes per second.
:param preferred_transfer_client: String specifying preferred transfer
client for transfer operations.
Current supported settings are:
* auto (default) - Use the CRTTransferManager when calls
are made with supported environment and settings.
* classic - Only use the origin S3TransferManager with
requests. Disables possible CRT upgrade on requests.
"""
super().__init__(
multipart_threshold=multipart_threshold,
Expand All @@ -258,6 +298,7 @@ def __init__(
for alias in self.ALIAS:
setattr(self, alias, getattr(self, self.ALIAS[alias]))
self.use_threads = use_threads
self.preferred_transfer_client = preferred_transfer_client

def __setattr__(self, name, value):
# If the alias name is used, make sure we set the name that it points
Expand Down
58 changes: 58 additions & 0 deletions tests/functional/test_crt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from contextlib import ContextDecorator

from botocore.compat import HAS_CRT

from boto3.s3.transfer import TransferConfig, create_transfer_manager
from tests import mock, requires_crt

if HAS_CRT:
from s3transfer.crt import CRTTransferManager


class MockOptimizedInstance(ContextDecorator):
"""Helper class to simulate a CRT optimized EC2 instance."""

def __init__(self, lock="lock", optimized=True):
self.acquire_process_lock = mock.patch(
'boto3.crt.acquire_crt_s3_process_lock'
)
self.acquire_process_lock.return_value = lock
self.is_optimized = mock.patch('awscrt.s3.is_optimized_for_system')
self.is_optimized.return_value = optimized

def __enter__(self, *args, **kwargs):
self.acquire_process_lock.start()
self.acquire_process_lock.return_value = None
self.is_optimized.start()

def __exit__(self, *args, **kwargs):
self.acquire_process_lock.stop()
self.is_optimized.stop()


def create_mock_client(region_name='us-west-2'):
client = mock.Mock()
client.meta.region_name = region_name
return client


@requires_crt()
@MockOptimizedInstance()
def test_create_transfer_manager_on_optimized_instance():
client = create_mock_client()
config = TransferConfig()
transfer_manager = create_transfer_manager(client, config)
assert isinstance(transfer_manager, CRTTransferManager)
Loading

0 comments on commit 640e8d9

Please sign in to comment.