Skip to content

Commit

Permalink
Add better testing
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Nov 26, 2023
1 parent abc90a2 commit 8eaef59
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 19 deletions.
23 changes: 17 additions & 6 deletions boto3/crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
_CRT_S3_CLIENT = None
_BOTOCORE_CRT_SERIALIZER = None

_CLIENT_CREATION_LOCK = threading.Lock()
CLIENT_CREATION_LOCK = threading.Lock()
PROCESS_LOCK_NAME = 'boto3'


def _create_crt_client(session, config, region_name, cred_provider):
Expand All @@ -59,7 +60,7 @@ def _create_crt_request_serializer(session, region_name):

def _create_crt_s3_client(session, config, region_name, credentials, **kwargs):
"""Create boto3 wrapper class to manage crt lock reference and S3 client."""
lock = acquire_crt_s3_process_lock('boto3')
lock = acquire_crt_s3_process_lock(PROCESS_LOCK_NAME)
if lock is None:
# If we're unable to acquire the lock, we cannot
# use the CRT in this process and should default to
Expand All @@ -72,6 +73,7 @@ def _create_crt_s3_client(session, config, region_name, credentials, **kwargs):
_create_crt_client(session, config, region_name, cred_provider),
lock,
region_name,
cred_provider,
)


Expand All @@ -91,7 +93,7 @@ def get_crt_s3_client(client, config):
global _CRT_S3_CLIENT
global _BOTOCORE_CRT_SERIALIZER

with _CLIENT_CREATION_LOCK:
with CLIENT_CREATION_LOCK:
if _CRT_S3_CLIENT is None:
serializer, s3_client = _initialize_crt_transfer_primatives(
client, config
Expand All @@ -112,17 +114,26 @@ class CRTS3Client:
ensure we don't use the CRT client when a successful request cannot be made.
"""

def __init__(self, crt_client, process_lock, region):
def __init__(self, crt_client, process_lock, region, cred_provider):
self.crt_client = crt_client
self.process_lock = process_lock
self.region = region
self.cred_provider = cred_provider


def is_crt_compatible_request(client, crt_s3_client):
if crt_s3_client is None:
return False

is_same_region = client.meta.region_name == crt_s3_client.region
is_same_identity = True
return is_same_region and is_same_identity


def create_crt_transfer_manager(client, config):
"""Create a CRTTransferManager for optimized data transfer."""
crt_s3_client = get_crt_s3_client(client, config)
called_region = client.meta.region_name
if crt_s3_client is not None and crt_s3_client.region == called_region:
if is_crt_compatible_request(client, crt_s3_client):
return CRTTransferManager(
crt_s3_client.crt_client, _BOTOCORE_CRT_SERIALIZER
)
Expand Down
8 changes: 7 additions & 1 deletion boto3/s3/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import copy as python_copy

from botocore.exceptions import ClientError

from boto3 import utils
Expand Down Expand Up @@ -432,7 +434,11 @@ def copy(
if config is None:
config = TransferConfig()

with create_transfer_manager(self, config) as manager:
# copy is not supported in the CRT
new_config = python_copy.copy(config)
new_config.preferred_transfer_client = "classic"

with create_transfer_manager(self, new_config) as manager:
future = manager.copy(
copy_source=CopySource,
bucket=Bucket,
Expand Down
40 changes: 39 additions & 1 deletion boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ def __call__(self, bytes_amount):
from boto3.exceptions import RetriesExceededError, S3UploadFailedError

if HAS_CRT:
import awscrt.s3

from boto3.crt import create_crt_transfer_manager

KB = 1024
Expand All @@ -163,7 +165,7 @@ def create_transfer_manager(client, config, osutil=None):
:rtype: s3transfer.manager.TransferManager
:returns: A transfer manager based on parameters provided
"""
if HAS_CRT:
if _should_use_crt(config):
crt_transfer_manager = create_crt_transfer_manager(client, config)
if crt_transfer_manager is not None:
logger.debug(
Expand All @@ -178,6 +180,31 @@ def create_transfer_manager(client, config, osutil=None):
return _create_default_transfer_manager(client, config, osutil)


def _should_use_crt(config):
if HAS_CRT:
is_optimized_instance = awscrt.s3.is_optimized_for_system()
else:
is_optimized_instance = False
pref_transfer_manager = config.preferred_transfer_client.lower()

if is_optimized_instance and pref_transfer_manager == "auto":
logger.debug(
"Opting into CRT Transfer Manager. Some config settings "
"may be ignored."
)
return True

logger.debug(
"Opting out of CRT Transfer Manager. Preferred client: "
"{pref_transfer_manager}, CRT available: {HAS_CRT}, "
"Instance Optimized: {is_optimized_instance}.",
pref_transfer_manager,
HAS_CRT,
is_optimized_instance,
)
return False


def _create_default_transfer_manager(client, config, osutil):
"""Create the default TransferManager implementation for s3transfer."""
executor_cls = None
Expand All @@ -202,6 +229,7 @@ def __init__(
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=None,
preferred_transfer_client="auto",
):
"""Configuration object for managed S3 transfers
Expand Down Expand Up @@ -242,6 +270,15 @@ def __init__(
:param max_bandwidth: The maximum bandwidth that will be consumed
in uploading and downloading file content. The value is an integer
in terms of bytes per second.
:param preferred_transfer_client: String specifying preferred transfer
client for transfer operations.
Current supported settings are:
* auto (default) - Use the CRTTransferManager when calls
are made with supported environment and settings.
* classic - Only use the origin S3TransferManager with
requests. Disables possible CRT upgrade on requests.
"""
super().__init__(
multipart_threshold=multipart_threshold,
Expand All @@ -258,6 +295,7 @@ def __init__(
for alias in self.ALIAS:
setattr(self, alias, getattr(self, self.ALIAS[alias]))
self.use_threads = use_threads
self.preferred_transfer_client = preferred_transfer_client

def __setattr__(self, name, value):
# If the alias name is used, make sure we set the name that it points
Expand Down
58 changes: 58 additions & 0 deletions tests/functional/test_crt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# https://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from contextlib import ContextDecorator

from botocore.compat import HAS_CRT

from boto3.s3.transfer import TransferConfig, create_transfer_manager
from tests import mock, requires_crt

if HAS_CRT:
from s3transfer.crt import CRTTransferManager


class MockOptimizedInstance(ContextDecorator):
"""Helper class to simulate a CRT optimized EC2 instance."""

def __init__(self, lock="lock", optimized=True):
self.acquire_process_lock = mock.patch(
'boto3.crt.acquire_crt_s3_process_lock'
)
self.acquire_process_lock.return_value = lock
self.is_optimized = mock.patch('awscrt.s3.is_optimized_for_system')
self.is_optimized.return_value = optimized

def __enter__(self, *args, **kwargs):
self.acquire_process_lock.start()
self.acquire_process_lock.return_value = None
self.is_optimized.start()

def __exit__(self, *args, **kwargs):
self.acquire_process_lock.stop()
self.is_optimized.stop()


def create_mock_client(region_name='us-west-2'):
client = mock.Mock()
client.meta.region_name = region_name
return client


@requires_crt()
@MockOptimizedInstance()
def test_create_transfer_manager_on_optimized_instance():
client = create_mock_client()
config = TransferConfig()
transfer_manager = create_transfer_manager(client, config)
assert isinstance(transfer_manager, CRTTransferManager)
76 changes: 67 additions & 9 deletions tests/unit/s3/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import copy
import pathlib
from tempfile import NamedTemporaryFile

Expand All @@ -32,25 +33,45 @@
from tests import mock, unittest


def create_mock_client(region_name='us-west-2'):
client = mock.Mock()
client.meta.region_name = region_name
return client


class TestCreateTransferManager(unittest.TestCase):
def test_create_transfer_manager(self):
client = object()
config = TransferConfig()
client = create_mock_client()
config = TransferConfig(preferred_transfer_client="classic")
osutil = OSUtils()
with mock.patch('boto3.s3.transfer.TransferManager') as manager:
create_transfer_manager(client, config, osutil)
assert manager.call_args == mock.call(client, config, osutil, None)

def test_create_transfer_manager_with_no_threads(self):
client = object()
config = TransferConfig()
client = create_mock_client()
config = TransferConfig(preferred_transfer_client="classic")
config.use_threads = False
with mock.patch('boto3.s3.transfer.TransferManager') as manager:
create_transfer_manager(client, config)
assert manager.call_args == mock.call(
client, config, None, NonThreadedExecutor
)

def test_create_transfer_manager_with_default_config(self):
"""Ensure we still default to classic transfer manager when CRT
is disabled.
"""
with mock.patch('boto3.s3.transfer.HAS_CRT', False):
client = create_mock_client()
config = TransferConfig()
assert config.preferred_transfer_client == "auto"
with mock.patch('boto3.s3.transfer.TransferManager') as manager:
create_transfer_manager(client, config)
assert manager.call_args == mock.call(
client, config, None, None
)


class TestTransferConfig(unittest.TestCase):
def assert_value_of_actual_and_alias(
Expand Down Expand Up @@ -104,6 +125,7 @@ def test_transferconfig_parameters(self):
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=1024 * KB,
preferred_transfer_client="classic",
)
assert config.multipart_threshold == 8 * MB
assert config.multipart_chunksize == 8 * MB
Expand All @@ -113,6 +135,40 @@ def test_transferconfig_parameters(self):
assert config.io_chunksize == 256 * KB
assert config.use_threads is True
assert config.max_bandwidth == 1024 * KB
assert config.preferred_transfer_client == "classic"

def test_transferconfig_copy(self):
config = TransferConfig(
multipart_threshold=8 * MB,
max_concurrency=10,
multipart_chunksize=8 * MB,
num_download_attempts=5,
max_io_queue=100,
io_chunksize=256 * KB,
use_threads=True,
max_bandwidth=1024 * KB,
preferred_transfer_client="classic",
)
copied_config = copy.copy(config)

assert config is not copied_config
assert config.multipart_threshold == copied_config.multipart_threshold
assert config.multipart_chunksize == copied_config.multipart_chunksize
assert (
config.max_request_concurrency
== copied_config.max_request_concurrency
)
assert (
config.num_download_attempts == copied_config.num_download_attempts
)
assert config.max_io_queue_size == copied_config.max_io_queue_size
assert config.io_chunksize == copied_config.io_chunksize
assert config.use_threads == copied_config.use_threads
assert config.max_bandwidth == copied_config.max_bandwidth
assert (
config.preferred_transfer_client
== copied_config.preferred_transfer_client
)


class TestProgressCallbackInvoker(unittest.TestCase):
Expand All @@ -125,7 +181,7 @@ def test_on_progress(self):

class TestS3Transfer(unittest.TestCase):
def setUp(self):
self.client = mock.Mock()
self.client = create_mock_client()
self.manager = mock.Mock(TransferManager(self.client))
self.transfer = S3Transfer(manager=self.manager)
self.callback = mock.Mock()
Expand Down Expand Up @@ -242,12 +298,14 @@ def test_propogation_s3_upload_failed_error(self):
self.transfer.upload_file('smallfile', 'bucket', 'key')

def test_can_create_with_just_client(self):
transfer = S3Transfer(client=mock.Mock())
transfer = S3Transfer(client=create_mock_client())
assert isinstance(transfer, S3Transfer)

def test_can_create_with_extra_configurations(self):
transfer = S3Transfer(
client=mock.Mock(), config=TransferConfig(), osutil=OSUtils()
client=create_mock_client(),
config=TransferConfig(),
osutil=OSUtils(),
)
assert isinstance(transfer, S3Transfer)

Expand All @@ -268,12 +326,12 @@ def test_osutil_and_manager_are_mutually_exclusive(self):
S3Transfer(osutil=mock.Mock(), manager=self.manager)

def test_upload_requires_string_filename(self):
transfer = S3Transfer(client=mock.Mock())
transfer = S3Transfer(client=create_mock_client())
with pytest.raises(ValueError):
transfer.upload_file(filename=object(), bucket='foo', key='bar')

def test_download_requires_string_filename(self):
transfer = S3Transfer(client=mock.Mock())
transfer = S3Transfer(client=create_mock_client())
with pytest.raises(ValueError):
transfer.download_file(bucket='foo', key='bar', filename=object())

Expand Down
3 changes: 1 addition & 2 deletions tests/unit/test_crt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

import boto3
import s3transfer
from botocore.compat import HAS_CRT

import boto3
from tests import mock, requires_crt

if HAS_CRT:
Expand Down

0 comments on commit 8eaef59

Please sign in to comment.