From cd600a4fd60a16ffd553677b315b6bcde2ce96e2 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Fri, 22 Sep 2023 09:04:56 +0200 Subject: [PATCH 01/11] Retry finalizing multipart s3 upload --- smart_open/s3.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 32c55202..f7f4dc25 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -834,7 +834,13 @@ def close(self): UploadId=self._upload_id, MultipartUpload={'Parts': self._parts}, ) - _retry_if_failed(partial) + _retry_if_failed( + partial, + exceptions=( + botocore.exceptions.EndpointConnectionError, + botocore.errorfactory.NoSuchUpload, + ), + ) logger.debug('%s: completed multipart upload', self) elif self._upload_id: # From 4fd8d6bc1fe98ca57ee6df3ff985db101a23ceb1 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Fri, 22 Sep 2023 09:51:44 +0200 Subject: [PATCH 02/11] Fix catching NoSuchUpload --- smart_open/s3.py | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index f7f4dc25..882eaa42 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -834,13 +834,7 @@ def close(self): UploadId=self._upload_id, MultipartUpload={'Parts': self._parts}, ) - _retry_if_failed( - partial, - exceptions=( - botocore.exceptions.EndpointConnectionError, - botocore.errorfactory.NoSuchUpload, - ), - ) + _retry_if_failed(partial) logger.debug('%s: completed multipart upload', self) elif self._upload_id: # @@ -1120,22 +1114,40 @@ def _retry_if_failed( partial, attempts=_UPLOAD_ATTEMPTS, sleep_seconds=_SLEEP_SECONDS, - exceptions=None): + exceptions=None, # Dict[Exception, str] + client_error_codes=None): # Dict[str, str] if exceptions is None: - exceptions = (botocore.exceptions.EndpointConnectionError, ) + exceptions = { + botocore.exceptions.EndpointConnectionError: 'Unable to connect to the endpoint. Check your network connection.', + } + if client_error_codes is None: + client_error_codes = {'NoSuchUpload': 'Server-side flaky error (NoSuchUpload).'} for attempt in range(attempts): try: return partial() - except exceptions: + except tuple(exceptions) as exc: + msg = exceptions[type(exc)] + logger.critical( + '%s Sleeping and retrying %d more times before giving up.', + msg, + attempts - attempt - 1, + ) + time.sleep(sleep_seconds) + except botocore.exceptions.ClientError as exc: + error_code = exc['Error']['Code'] + if error_code not in client_error_codes: + raise + msg = client_error_codes[error_code] logger.critical( - 'Unable to connect to the endpoint. Check your network connection. ' - 'Sleeping and retrying %d more times ' - 'before giving up.' % (attempts - attempt - 1) + '%s Sleeping and retrying %d more times before giving up.', + msg, + attempts - attempt - 1, ) time.sleep(sleep_seconds) + else: - logger.critical('Unable to connect to the endpoint. Giving up.') - raise IOError('Unable to connect to the endpoint after %d attempts' % attempts) + logger.critical('%s Giving up.', msg) + raise IOError('%s failed after %d attempts', partial.func.__name__, attempts) def _accept_all(key): From 9ea3866ea944dcdca441ba9b496d19c3d71c47ef Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Fri, 22 Sep 2023 10:02:08 +0200 Subject: [PATCH 03/11] Make lint --- smart_open/s3.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 882eaa42..7f69ce4c 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -1118,7 +1118,9 @@ def _retry_if_failed( client_error_codes=None): # Dict[str, str] if exceptions is None: exceptions = { - botocore.exceptions.EndpointConnectionError: 'Unable to connect to the endpoint. Check your network connection.', + botocore.exceptions.EndpointConnectionError: ( + 'Unable to connect to the endpoint. Check your network connection.' + ), } if client_error_codes is None: client_error_codes = {'NoSuchUpload': 'Server-side flaky error (NoSuchUpload).'} @@ -1144,7 +1146,6 @@ def _retry_if_failed( attempts - attempt - 1, ) time.sleep(sleep_seconds) - else: logger.critical('%s Giving up.', msg) raise IOError('%s failed after %d attempts', partial.func.__name__, attempts) From e28a813007a13c0dfc18fb93991dd2ee712a390b Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Fri, 22 Sep 2023 10:26:51 +0200 Subject: [PATCH 04/11] Fix retrieving error_code --- smart_open/s3.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 7f69ce4c..b20a947a 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -1127,16 +1127,16 @@ def _retry_if_failed( for attempt in range(attempts): try: return partial() - except tuple(exceptions) as exc: - msg = exceptions[type(exc)] + except tuple(exceptions) as err: + msg = exceptions[type(err)] logger.critical( '%s Sleeping and retrying %d more times before giving up.', msg, attempts - attempt - 1, ) time.sleep(sleep_seconds) - except botocore.exceptions.ClientError as exc: - error_code = exc['Error']['Code'] + except botocore.exceptions.ClientError as err: + error_code = err.response['Error'].get('Code') if error_code not in client_error_codes: raise msg = client_error_codes[error_code] From 1478b9547832d29a32c398339f77a68cee083352 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Fri, 22 Sep 2023 13:21:40 +0300 Subject: [PATCH 05/11] Make lint --- smart_open/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index b20a947a..1860cb4b 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -1136,7 +1136,7 @@ def _retry_if_failed( ) time.sleep(sleep_seconds) except botocore.exceptions.ClientError as err: - error_code = err.response['Error'].get('Code') + error_code = err.response['Error'].get('Code') if error_code not in client_error_codes: raise msg = client_error_codes[error_code] From af17d95fd97327bbc1962a400de638a5f770df30 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Sat, 23 Sep 2023 08:03:52 +0300 Subject: [PATCH 06/11] Amend test --- smart_open/tests/test_s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index fa907101..43dfce3c 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -954,7 +954,7 @@ def test_success(self): def test_failure(self): partial = mock.Mock(side_effect=ValueError) - exceptions = (ValueError, ) + exceptions = {ValueError: 'Let us retry ValueError'} with self.assertRaises(IOError): smart_open.s3._retry_if_failed(partial, attempts=3, sleep_seconds=0, exceptions=exceptions) From 590a31a08a1b1fcfd521f410fe0e09c2d5331453 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Sat, 23 Sep 2023 08:09:49 +0300 Subject: [PATCH 07/11] Simplify --- smart_open/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 1860cb4b..dc43024e 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -1148,7 +1148,7 @@ def _retry_if_failed( time.sleep(sleep_seconds) else: logger.critical('%s Giving up.', msg) - raise IOError('%s failed after %d attempts', partial.func.__name__, attempts) + raise IOError('%s failed after %d attempts', partial.func, attempts) def _accept_all(key): From c173ff36b294853cf6800cf26898df7395ac00e3 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Sat, 23 Sep 2023 08:19:35 +0300 Subject: [PATCH 08/11] Add test --- smart_open/tests/test_s3.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 43dfce3c..4a798f68 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -21,6 +21,7 @@ import boto3 import botocore.client import botocore.endpoint +import botocore.exceptions import moto import pytest @@ -961,6 +962,17 @@ def test_failure(self): self.assertEqual(partial.call_count, 3) + partial = mock.Mock( + side_effect=botocore.exceptions.ClientError( + {'Error': {'Code': 'NoSuchUpload'}}, 'NoSuchUpload' + ) + ) + + with self.assertRaises(IOError): + smart_open.s3._retry_if_failed(partial, attempts=3, sleep_seconds=0) + + self.assertEqual(partial.call_count, 3) + @moto.mock_s3() def test_client_propagation_singlepart(): From 5b2d2663e00f9739cb00b082ea5f6fb18cc8dce0 Mon Sep 17 00:00:00 2001 From: ddelange <14880945+ddelange@users.noreply.github.com> Date: Wed, 21 Feb 2024 08:22:15 +0100 Subject: [PATCH 09/11] Remove unused import --- smart_open/tests/test_s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 8abad40b..11a09997 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -22,7 +22,6 @@ import botocore.client import botocore.endpoint import botocore.exceptions -import moto import pytest # See https://github.com/piskvorky/smart_open/issues/800 From 468ac14fdf5e2bae50853dc05be298662bf4bb38 Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 21 Feb 2024 16:52:39 +0900 Subject: [PATCH 10/11] refactor retry mechanism to expose parameters --- smart_open/s3.py | 97 +++++++++++++++++++------------------ smart_open/tests/test_s3.py | 21 ++++---- 2 files changed, 62 insertions(+), 56 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index 9cff9fc1..7b6dfe91 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -28,6 +28,11 @@ from smart_open import constants +from typing import ( + Callable, + List, +) + logger = logging.getLogger(__name__) DEFAULT_MIN_PART_SIZE = 50 * 1024**2 @@ -47,13 +52,52 @@ 's3://my_key:my_secret@my_server:my_port@my_bucket/my_key', ) -_UPLOAD_ATTEMPTS = 6 -_SLEEP_SECONDS = 10 - # Returned by AWS when we try to seek beyond EOF. _OUT_OF_RANGE = 'InvalidRange' +class Retry: + def __init__(self): + self.attempts: int = 6 + self.sleep_seconds: int = 10 + self.exceptions: List[Exception] = [botocore.exceptions.EndpointConnectionError] + self.client_error_codes: List[str] = ['NoSuchUpload'] + + def _do(self, fn: Callable): + for attempt in range(self.attempts): + try: + return fn() + except tuple(self.exceptions) as err: + logger.critical( + 'Caught non-fatal %s, retrying %d more times', + err, + self.attempts - attempt - 1, + ) + logger.exception(err) + time.sleep(self.sleep_seconds) + except botocore.exceptions.ClientError as err: + error_code = err.response['Error'].get('Code') + if error_code not in self.client_error_codes: + raise + logger.critical( + 'Caught non-fatal ClientError (%s), retrying %d more times', + error_code, + self.attempts - attempt - 1, + ) + logger.exception(err) + time.sleep(self.sleep_seconds) + else: + logger.critical('encountered too many non-fatal errors, giving up') + raise IOError('%s failed after %d attempts', fn.func, self.attempts) + + +# +# The retry mechanism for this submodule. Client code may modify it, e.g. by +# updating RETRY.sleep_seconds and friends. +# +RETRY = Retry() + + class _ClientWrapper: """Wraps a client to inject the appropriate keyword args into each method call. @@ -803,7 +847,7 @@ def __init__( Bucket=bucket, Key=key, ) - self._upload_id = _retry_if_failed(partial)['UploadId'] + self._upload_id = RETRY._do(partial)['UploadId'] except botocore.client.ClientError as error: raise ValueError( 'the bucket %r does not exist, or is forbidden for access (%r)' % ( @@ -843,7 +887,7 @@ def close(self): UploadId=self._upload_id, MultipartUpload={'Parts': self._parts}, ) - _retry_if_failed(partial) + RETRY._do(partial) logger.debug('%s: completed multipart upload', self) elif self._upload_id: # @@ -954,7 +998,7 @@ def _upload_next_part(self): # of a temporary connection problem, so this part needs to be # especially robust. # - upload = _retry_if_failed( + upload = RETRY._do( functools.partial( self._client.upload_part, Bucket=self._bucket, @@ -1119,47 +1163,6 @@ def __repr__(self): return "smart_open.s3.SinglepartWriter(bucket=%r, key=%r)" % (self._bucket, self._key) -def _retry_if_failed( - partial, - attempts=_UPLOAD_ATTEMPTS, - sleep_seconds=_SLEEP_SECONDS, - exceptions=None, # Dict[Exception, str] - client_error_codes=None): # Dict[str, str] - if exceptions is None: - exceptions = { - botocore.exceptions.EndpointConnectionError: ( - 'Unable to connect to the endpoint. Check your network connection.' - ), - } - if client_error_codes is None: - client_error_codes = {'NoSuchUpload': 'Server-side flaky error (NoSuchUpload).'} - for attempt in range(attempts): - try: - return partial() - except tuple(exceptions) as err: - msg = exceptions[type(err)] - logger.critical( - '%s Sleeping and retrying %d more times before giving up.', - msg, - attempts - attempt - 1, - ) - time.sleep(sleep_seconds) - except botocore.exceptions.ClientError as err: - error_code = err.response['Error'].get('Code') - if error_code not in client_error_codes: - raise - msg = client_error_codes[error_code] - logger.critical( - '%s Sleeping and retrying %d more times before giving up.', - msg, - attempts - attempt - 1, - ) - time.sleep(sleep_seconds) - else: - logger.critical('%s Giving up.', msg) - raise IOError('%s failed after %d attempts', partial.func, attempts) - - def _accept_all(key): return True diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 11a09997..60e9ca22 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -953,30 +953,33 @@ def populate_bucket(num_keys=10): class RetryIfFailedTest(unittest.TestCase): + def setUp(self): + self.retry = smart_open.s3.Retry() + self.retry.attempts = 3 + self.retry.sleep_seconds = 0 + def test_success(self): partial = mock.Mock(return_value=1) - result = smart_open.s3._retry_if_failed(partial, attempts=3, sleep_seconds=0) + result = self.retry._do(partial) self.assertEqual(result, 1) self.assertEqual(partial.call_count, 1) - def test_failure(self): + def test_failure_exception(self): partial = mock.Mock(side_effect=ValueError) - exceptions = {ValueError: 'Let us retry ValueError'} - + self.retry.exceptions = {ValueError: 'Let us retry ValueError'} with self.assertRaises(IOError): - smart_open.s3._retry_if_failed(partial, attempts=3, sleep_seconds=0, exceptions=exceptions) - + self.retry._do(partial) self.assertEqual(partial.call_count, 3) + + def test_failure_client_error(self): partial = mock.Mock( side_effect=botocore.exceptions.ClientError( {'Error': {'Code': 'NoSuchUpload'}}, 'NoSuchUpload' ) ) - with self.assertRaises(IOError): - smart_open.s3._retry_if_failed(partial, attempts=3, sleep_seconds=0) - + self.retry._do(partial) self.assertEqual(partial.call_count, 3) From 52329809847fbfaa2b2ce1b47c90680d95ab1add Mon Sep 17 00:00:00 2001 From: Michael Penkov Date: Wed, 21 Feb 2024 17:02:38 +0900 Subject: [PATCH 11/11] flake8 --- smart_open/tests/test_s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/smart_open/tests/test_s3.py b/smart_open/tests/test_s3.py index 60e9ca22..d6e8c76a 100644 --- a/smart_open/tests/test_s3.py +++ b/smart_open/tests/test_s3.py @@ -971,7 +971,6 @@ def test_failure_exception(self): self.retry._do(partial) self.assertEqual(partial.call_count, 3) - def test_failure_client_error(self): partial = mock.Mock( side_effect=botocore.exceptions.ClientError(