From 807b72558938789e3647a30f418d200623b4b51f Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 17:15:08 +0200 Subject: [PATCH 1/9] Locking; first draft (WIP) --- s3pypi/__main__.py | 9 +++++ s3pypi/core.py | 60 ++++++++++++++++++++++--------- s3pypi/locking.py | 47 ++++++++++++++++++++++++ s3pypi/storage.py | 13 ++++--- terraform/modules/s3pypi/main.tf | 2 ++ tests/integration/conftest.py | 5 +++ tests/integration/test_storage.py | 8 ++--- 7 files changed, 118 insertions(+), 26 deletions(-) create mode 100644 s3pypi/locking.py diff --git a/s3pypi/__main__.py b/s3pypi/__main__.py index 4e4f551..d32a333 100644 --- a/s3pypi/__main__.py +++ b/s3pypi/__main__.py @@ -47,6 +47,15 @@ def get_arg_parser(): "It's recommended to instead use a private S3 bucket with a CloudFront Origin Access Identity." ), ) + p.add_argument( + "-l", + "--lock-indexes", + action="store_true", + help=( + "Lock index objects in S3 using a DynamoDB table. " + "This ensures that concurrent invocations of s3pypi do not overwrite each other's changes." + ), + ) p.add_argument("-f", "--force", action="store_true", help="Overwrite files.") p.add_argument("-v", "--verbose", action="store_true", help="Verbose output.") p.add_argument("-V", "--version", action="version", version=__version__) diff --git a/s3pypi/core.py b/s3pypi/core.py index 2c74aea..957c4df 100644 --- a/s3pypi/core.py +++ b/s3pypi/core.py @@ -5,10 +5,12 @@ from itertools import groupby from operator import attrgetter from pathlib import Path -from typing import List +from typing import List, Optional from zipfile import ZipFile -from s3pypi import __prog__ +import boto3 + +from s3pypi import __prog__, locking from s3pypi.exceptions import S3PyPiError from s3pypi.storage import S3Storage @@ -28,27 +30,51 @@ def normalize_package_name(name: str) -> str: return re.sub(r"[-_.]+", "-", name.lower()) -def upload_packages(dist: List[Path], bucket: str, force: bool = False, **kwargs): - storage = S3Storage(bucket, **kwargs) +def upload_packages( + dist: List[Path], + bucket: str, + force: bool = False, + lock_indexes: bool = False, + profile: Optional[str] = None, + region: Optional[str] = None, + **kwargs, +): + session = boto3.Session(profile_name=profile, region_name=region) + storage = S3Storage(session, bucket, **kwargs) + lock = ( + # TODO: Make table name customizable? + locking.DynamoDbLocker(session, table=bucket) + if lock_indexes + else locking.DummyLocker() + ) distributions = [parse_distribution(path) for path in dist] get_name = attrgetter("name") for name, group in groupby(sorted(distributions, key=get_name), get_name): directory = normalize_package_name(name) - index = storage.get_index(directory) - - for distr in group: - filename = distr.local_path.name - - if not force and filename in index.filenames: - log.warning("%s already exists! (use --force to overwrite)", filename) - else: - log.info("Uploading %s", distr.local_path) - storage.put_distribution(directory, distr.local_path) - index.filenames.add(filename) - - storage.put_index(directory, index) + with lock(directory): + index = storage.get_index(directory) + + for distr in group: + filename = distr.local_path.name + + if not force and filename in index.filenames: + log.warning( + "%s already exists! (use --force to overwrite)", filename + ) + else: + log.info("Uploading %s", distr.local_path) + storage.put_distribution(directory, distr.local_path) + index.filenames.add(filename) + + storage.put_index(directory, index) + + root = storage.root + with lock(root): + index = storage.get_index(root) + # TODO: Update root index + storage.put_index(root, index) def parse_distribution(path: Path) -> Distribution: diff --git a/s3pypi/locking.py b/s3pypi/locking.py new file mode 100644 index 0000000..a1f87e5 --- /dev/null +++ b/s3pypi/locking.py @@ -0,0 +1,47 @@ +import abc +from contextlib import contextmanager + +import boto3 + + +class Locker(abc.ABC): + @contextmanager + def __call__(self, key: str): + self._lock(key) + try: + yield + finally: + self._unlock(key) + + @abc.abstractmethod + def _lock(self, key: str): + ... + + @abc.abstractmethod + def _unlock(self, key: str): + ... + + +class DummyLocker(Locker): + def _lock(self, key: str): + pass + + _unlock = _lock + + +class DynamoDbLocker(Locker): + def __init__( + self, + session: boto3.session.Session, + table: str, + timeout: int = 10, + ): + db = session.resource("dynamodb") + self.table = db.Table(table) + self.timeout = timeout + + def _lock(self, key: str): + raise NotImplementedError + + def _unlock(self, key: str): + raise NotImplementedError diff --git a/s3pypi/storage.py b/s3pypi/storage.py index 421d899..ae1fa09 100644 --- a/s3pypi/storage.py +++ b/s3pypi/storage.py @@ -8,21 +8,22 @@ class S3Storage: + root = "/" + _index = "index.html" + def __init__( self, + session: boto3.session.Session, bucket: str, - profile: Optional[str] = None, - region: Optional[str] = None, prefix: Optional[str] = None, acl: Optional[str] = None, s3_put_args: Optional[dict] = None, unsafe_s3_website: bool = False, ): - session = boto3.Session(profile_name=profile, region_name=region) self.s3 = session.resource("s3") self.bucket = bucket self.prefix = prefix - self.index_name = "index.html" if unsafe_s3_website else "" + self.index_name = self._index if unsafe_s3_website else "" self.put_kwargs = dict( ACL=acl or "private", **(s3_put_args or {}), @@ -30,9 +31,11 @@ def __init__( def _object(self, directory: str, filename: str): parts = [directory, filename] + if parts == [self.root, self.index_name]: + parts = [self._index] if self.prefix: parts.insert(0, self.prefix) - return self.s3.Object(self.bucket, "/".join(parts)) + return self.s3.Object(self.bucket, key="/".join(parts)) def get_index(self, directory: str) -> Index: try: diff --git a/terraform/modules/s3pypi/main.tf b/terraform/modules/s3pypi/main.tf index 6e5b0d1..4b16388 100644 --- a/terraform/modules/s3pypi/main.tf +++ b/terraform/modules/s3pypi/main.tf @@ -67,6 +67,8 @@ resource "aws_cloudfront_distribution" "cdn" { error_caching_min_ttl = 0 } + default_root_object = "index.html" + default_cache_behavior { target_origin_id = "s3" viewer_protocol_policy = "redirect-to-https" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8b66ade..e9b769b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -35,3 +35,8 @@ def s3_bucket(aws_credentials): bucket = conn.Bucket("s3pypi-test") bucket.create() yield bucket + + +@pytest.fixture +def boto3_session(s3_bucket): + return boto3.session.Session() diff --git a/tests/integration/test_storage.py b/tests/integration/test_storage.py index 5ba6e7a..8c9acc8 100644 --- a/tests/integration/test_storage.py +++ b/tests/integration/test_storage.py @@ -2,21 +2,21 @@ from s3pypi.storage import S3Storage -def test_index_storage_roundtrip(s3_bucket): +def test_index_storage_roundtrip(boto3_session, s3_bucket): directory = "foo" index = Index({"bar"}) - storage = S3Storage(s3_bucket.name) + storage = S3Storage(boto3_session, s3_bucket.name) storage.put_index(directory, index) got = storage.get_index(directory) assert got == index -def test_prefix_in_s3_key(): +def test_prefix_in_s3_key(boto3_session): prefix = "1234567890" - storage = S3Storage(bucket="example", prefix=prefix) + storage = S3Storage(boto3_session, bucket="example", prefix=prefix) obj = storage._object(directory="foo", filename="bar") assert obj.key.startswith(prefix + "/") From 06b60f5396782c2ef04bae503ca5cd932fa6b8ee Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 20:41:04 +0200 Subject: [PATCH 2/9] Locking; implement lock and unlock methods --- s3pypi/core.py | 11 ++++------ s3pypi/locking.py | 54 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 20 deletions(-) diff --git a/s3pypi/core.py b/s3pypi/core.py index 957c4df..6ad0645 100644 --- a/s3pypi/core.py +++ b/s3pypi/core.py @@ -10,8 +10,9 @@ import boto3 -from s3pypi import __prog__, locking +from s3pypi import __prog__ from s3pypi.exceptions import S3PyPiError +from s3pypi.locking import DummyLocker, DynamoDBLocker from s3pypi.storage import S3Storage log = logging.getLogger(__prog__) @@ -41,12 +42,8 @@ def upload_packages( ): session = boto3.Session(profile_name=profile, region_name=region) storage = S3Storage(session, bucket, **kwargs) - lock = ( - # TODO: Make table name customizable? - locking.DynamoDbLocker(session, table=bucket) - if lock_indexes - else locking.DummyLocker() - ) + # TODO: Make table name customizable? + lock = DynamoDBLocker(session, table=bucket) if lock_indexes else DummyLocker() distributions = [parse_distribution(path) for path in dist] get_name = attrgetter("name") diff --git a/s3pypi/locking.py b/s3pypi/locking.py index a1f87e5..41f55e3 100644 --- a/s3pypi/locking.py +++ b/s3pypi/locking.py @@ -1,47 +1,75 @@ import abc +import datetime as dt +import getpass +import time from contextlib import contextmanager import boto3 +class LockTimeoutError(Exception): + def __init__(self, lock_id: str): + super().__init__(f"Timed out trying to acquire lock '{lock_id}'") + + class Locker(abc.ABC): @contextmanager - def __call__(self, key: str): - self._lock(key) + def __call__(self, lock_id: str): + self._lock(lock_id) try: yield finally: - self._unlock(key) + self._unlock(lock_id) @abc.abstractmethod - def _lock(self, key: str): + def _lock(self, lock_id: str): ... @abc.abstractmethod - def _unlock(self, key: str): + def _unlock(self, lock_id: str): ... class DummyLocker(Locker): - def _lock(self, key: str): + def _lock(self, lock_id: str): pass _unlock = _lock -class DynamoDbLocker(Locker): +class DynamoDBLocker(Locker): def __init__( self, session: boto3.session.Session, table: str, - timeout: int = 10, + poll_interval: int = 1, + max_attempts: int = 30, ): db = session.resource("dynamodb") self.table = db.Table(table) - self.timeout = timeout + self.exc = self.table.meta.client.exceptions + self.poll_interval = poll_interval + self.max_attempts = max_attempts + self.user = getpass.getuser() + + def _lock(self, lock_id: str): + for attempt in range(1, self.max_attempts + 1): + now = dt.datetime.now(dt.timezone.utc) + try: + self.table.put_item( + Item={ + "LockID": lock_id, + "AcquiredAt": now.isoformat(), + "Username": self.user, + }, + ConditionExpression="attribute_not_exists(LockID)", + ) + return + except self.exc.ConditionalCheckFailedException: + if attempt < self.max_attempts: + time.sleep(self.poll_interval) - def _lock(self, key: str): - raise NotImplementedError + raise LockTimeoutError(lock_id) - def _unlock(self, key: str): - raise NotImplementedError + def _unlock(self, lock_id: str): + self.table.delete_item(Item={"LockID": lock_id}) From 8e6b55134931fbfb8584d03b7c2c7a0da1eeca92 Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 22:15:07 +0200 Subject: [PATCH 3/9] Add --put-root-index option --- CHANGELOG.md | 8 ++++++++ s3pypi/__main__.py | 1 + s3pypi/core.py | 10 +++++----- s3pypi/index.py | 2 +- s3pypi/storage.py | 11 +++++++++++ tests/integration/test_main.py | 5 ++++- 6 files changed, 30 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 889b086..cb7ddd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/). +## Unreleased + +### Added + +- `--lock-indexes` option to lock index objects in S3 using a DynamoDB table. +- `--put-root-index` option to write a root index that lists all package names. + + ## 1.0.0rc1 - 2021-05-19 ### Added diff --git a/s3pypi/__main__.py b/s3pypi/__main__.py index d32a333..30acf9c 100644 --- a/s3pypi/__main__.py +++ b/s3pypi/__main__.py @@ -56,6 +56,7 @@ def get_arg_parser(): "This ensures that concurrent invocations of s3pypi do not overwrite each other's changes." ), ) + p.add_argument("--put-root-index", action="store_true", help="Write a root index.") p.add_argument("-f", "--force", action="store_true", help="Overwrite files.") p.add_argument("-v", "--verbose", action="store_true", help="Verbose output.") p.add_argument("-V", "--version", action="version", version=__version__) diff --git a/s3pypi/core.py b/s3pypi/core.py index 6ad0645..d280ee8 100644 --- a/s3pypi/core.py +++ b/s3pypi/core.py @@ -36,6 +36,7 @@ def upload_packages( bucket: str, force: bool = False, lock_indexes: bool = False, + put_root_index: bool = False, profile: Optional[str] = None, region: Optional[str] = None, **kwargs, @@ -67,11 +68,10 @@ def upload_packages( storage.put_index(directory, index) - root = storage.root - with lock(root): - index = storage.get_index(root) - # TODO: Update root index - storage.put_index(root, index) + if put_root_index: + with lock(storage.root): + index = storage.build_root_index() + storage.put_index(storage.root, index) def parse_distribution(path: Path) -> Distribution: diff --git a/s3pypi/index.py b/s3pypi/index.py index f2c7fdc..513bd6e 100644 --- a/s3pypi/index.py +++ b/s3pypi/index.py @@ -16,7 +16,7 @@ def parse(cls, html: str) -> "Index": def to_html(self) -> str: links = "
\n".join( - f'{fname}' + f'{fname.rstrip("/")}' for fname in sorted(self.filenames) ) return index_html.format(body=indent(links, " " * 4)) diff --git a/s3pypi/storage.py b/s3pypi/storage.py index ae1fa09..415190e 100644 --- a/s3pypi/storage.py +++ b/s3pypi/storage.py @@ -44,6 +44,17 @@ def get_index(self, directory: str) -> Index: return Index() return Index.parse(html.decode()) + def build_root_index(self) -> Index: + paginator = self.s3.meta.client.get_paginator("list_objects_v2") + result = paginator.paginate( + Bucket=self.bucket, + Prefix=self.prefix or "", + Delimiter="/", + ) + n = len(self.prefix) + 1 if self.prefix else 0 + dirs = set(p.get("Prefix")[n:] for p in result.search("CommonPrefixes")) + return Index(dirs) + def put_index(self, directory: str, index: Index): self._object(directory, self.index_name).put( Body=index.to_html(), diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py index e65a418..feb08c5 100644 --- a/tests/integration/test_main.py +++ b/tests/integration/test_main.py @@ -23,14 +23,17 @@ def test_string_dict(text, expected): def test_main_upload_package(chdir, data_dir, s3_bucket): with chdir(data_dir): dist = sorted(glob.glob("dists/*")) - s3pypi(*dist, "--bucket", s3_bucket.name) + s3pypi(*dist, "--bucket", s3_bucket.name, "--put-root-index") def read(key: str) -> bytes: return s3_bucket.Object(key).get()["Body"].read() + root_index = read("index.html").decode() + def assert_pkg_exists(prefix: str, filename: str): assert read(prefix + filename) assert f">{filename}" in read(prefix).decode() + assert f">{prefix.rstrip('/')}" in root_index assert_pkg_exists("foo/", "foo-0.1.0.tar.gz") assert_pkg_exists("hello-world/", "hello_world-0.1.0-py3-none-any.whl") From 27d5d5f11722139f27d7171cfd925a6c8a47064b Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 22:37:11 +0200 Subject: [PATCH 4/9] Update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cb7ddd6..59e6b2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/) - `--lock-indexes` option to lock index objects in S3 using a DynamoDB table. - `--put-root-index` option to write a root index that lists all package names. +### Changed + +- Set the default root object to `index.html` in CloudFront. + ## 1.0.0rc1 - 2021-05-19 From b2100d5c4e143163e4c47fd0c38ab3158f7c6467 Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 23:05:37 +0200 Subject: [PATCH 5/9] Locking; add Terraform config for DynamoDB table --- CHANGELOG.md | 3 ++- README.md | 8 ++++++++ s3pypi/core.py | 7 +++++-- s3pypi/locking.py | 2 +- terraform/modules/s3pypi/main.tf | 19 +++++++++++++++++++ 5 files changed, 35 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 59e6b2e..5ebc8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/) ### Added -- `--lock-indexes` option to lock index objects in S3 using a DynamoDB table. +- Terraform config for an optional DynamoDB table used for distributed locking. +- `--lock-indexes` option to lock index objects in S3 using said DynamoDB table. - `--put-root-index` option to write a root index that lists all package names. ### Changed diff --git a/README.md b/README.md index af31186..8eee162 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,13 @@ your domain, with a matching (wildcard) certificate in [AWS Certificate Manager]. If your certificate is a wildcard certificate, add `use_wildcard_certificate = true` to `config.auto.tfvars`. +#### Distributed locking with DynamoDB + +To ensure that concurrent invocations of `s3pypi` do not overwrite each other's +changes, the objects in S3 can be locked via an optional DynamoDB table (using +the `--lock-indexes` option). To create this table, add `enable_dynamodb_locking += true` to `config.auto.tfvars`. + #### Basic authentication To enable basic authentication, add `enable_basic_auth = true` to @@ -94,6 +101,7 @@ module "s3pypi" { domain = "pypi.example.com" use_wildcard_certificate = true + enable_dynamodb_locking = true enable_basic_auth = true providers = { diff --git a/s3pypi/core.py b/s3pypi/core.py index d280ee8..db48323 100644 --- a/s3pypi/core.py +++ b/s3pypi/core.py @@ -43,8 +43,11 @@ def upload_packages( ): session = boto3.Session(profile_name=profile, region_name=region) storage = S3Storage(session, bucket, **kwargs) - # TODO: Make table name customizable? - lock = DynamoDBLocker(session, table=bucket) if lock_indexes else DummyLocker() + lock = ( + DynamoDBLocker(session, table=f"{bucket}-locks") + if lock_indexes + else DummyLocker() + ) distributions = [parse_distribution(path) for path in dist] get_name = attrgetter("name") diff --git a/s3pypi/locking.py b/s3pypi/locking.py index 41f55e3..4b9131f 100644 --- a/s3pypi/locking.py +++ b/s3pypi/locking.py @@ -72,4 +72,4 @@ def _lock(self, lock_id: str): raise LockTimeoutError(lock_id) def _unlock(self, lock_id: str): - self.table.delete_item(Item={"LockID": lock_id}) + self.table.delete_item(Key={"LockID": lock_id}) diff --git a/terraform/modules/s3pypi/main.tf b/terraform/modules/s3pypi/main.tf index 4b16388..8a493f5 100644 --- a/terraform/modules/s3pypi/main.tf +++ b/terraform/modules/s3pypi/main.tf @@ -14,6 +14,12 @@ variable "use_wildcard_certificate" { description = "Use a wildcard certificate (*.example.com)" } +variable "enable_dynamodb_locking" { + type = bool + default = false + description = "Create a DynamoDB table for locking" +} + variable "enable_basic_auth" { type = bool default = false @@ -143,6 +149,19 @@ data "aws_iam_policy_document" "s3_policy" { } } +resource "aws_dynamodb_table" "locks" { + count = var.enable_dynamodb_locking ? 1 : 0 + + name = "${var.bucket}-locks" + billing_mode = "PAY_PER_REQUEST" + hash_key = "LockID" + + attribute { + name = "LockID" + type = "S" + } +} + module "basic_auth" { count = var.enable_basic_auth ? 1 : 0 From e36c168fcf8ca8ea527677ef40b3b8962a559e9a Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Wed, 19 May 2021 23:32:03 +0200 Subject: [PATCH 6/9] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ebc8b8..e2e1fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/) ### Changed -- Set the default root object to `index.html` in CloudFront. +- Set CloudFront default root object to `index.html`. ## 1.0.0rc1 - 2021-05-19 From bb27c063302d9903a22f222b59154d3c74325f28 Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Thu, 20 May 2021 01:29:08 +0200 Subject: [PATCH 7/9] Locking; hash LockID + improved timeout error message --- s3pypi/locking.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/s3pypi/locking.py b/s3pypi/locking.py index 4b9131f..cbc810a 100644 --- a/s3pypi/locking.py +++ b/s3pypi/locking.py @@ -1,20 +1,17 @@ import abc import datetime as dt -import getpass +import hashlib +import json import time from contextlib import contextmanager import boto3 -class LockTimeoutError(Exception): - def __init__(self, lock_id: str): - super().__init__(f"Timed out trying to acquire lock '{lock_id}'") - - class Locker(abc.ABC): @contextmanager - def __call__(self, lock_id: str): + def __call__(self, key: str): + lock_id = hashlib.sha1(key.encode()).hexdigest() self._lock(lock_id) try: yield @@ -43,14 +40,14 @@ def __init__( session: boto3.session.Session, table: str, poll_interval: int = 1, - max_attempts: int = 30, + max_attempts: int = 10, ): db = session.resource("dynamodb") self.table = db.Table(table) self.exc = self.table.meta.client.exceptions self.poll_interval = poll_interval self.max_attempts = max_attempts - self.user = getpass.getuser() + self.caller_id = session.client("sts").get_caller_identity()["Arn"] def _lock(self, lock_id: str): for attempt in range(1, self.max_attempts + 1): @@ -60,7 +57,7 @@ def _lock(self, lock_id: str): Item={ "LockID": lock_id, "AcquiredAt": now.isoformat(), - "Username": self.user, + "Owner": self.caller_id, }, ConditionExpression="attribute_not_exists(LockID)", ) @@ -69,7 +66,19 @@ def _lock(self, lock_id: str): if attempt < self.max_attempts: time.sleep(self.poll_interval) - raise LockTimeoutError(lock_id) + item = self.table.get_item(Key={"LockID": lock_id})["Item"] + raise DynamoDBLockTimeoutError(self.table.name, item) def _unlock(self, lock_id: str): self.table.delete_item(Key={"LockID": lock_id}) + + +class DynamoDBLockTimeoutError(Exception): + def __init__(self, table: str, item: dict): + key = json.dumps({"LockID": {"S": item["LockID"]}}) + super().__init__( + f"Timed out trying to acquire lock:\n\n{json.dumps(item, indent=2)}\n\n" + "Another instance of s3pypi may currently be holding the lock.\n" + "If this is not the case, you may release the lock as follows:\n\n" + f"$ aws dynamodb delete-item --table-name {table} --key '{key}'\n" + ) From a21e5f138aa5ef645e25c759108406602f549b59 Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Thu, 20 May 2021 08:20:09 +0200 Subject: [PATCH 8/9] Locking; improved logging and help messages --- s3pypi/__main__.py | 8 ++++++-- s3pypi/core.py | 5 ++--- s3pypi/locking.py | 9 ++++++++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/s3pypi/__main__.py b/s3pypi/__main__.py index 30acf9c..411aeb7 100644 --- a/s3pypi/__main__.py +++ b/s3pypi/__main__.py @@ -52,11 +52,15 @@ def get_arg_parser(): "--lock-indexes", action="store_true", help=( - "Lock index objects in S3 using a DynamoDB table. " + "Lock index objects in S3 using a DynamoDB table named `-locks`. " "This ensures that concurrent invocations of s3pypi do not overwrite each other's changes." ), ) - p.add_argument("--put-root-index", action="store_true", help="Write a root index.") + p.add_argument( + "--put-root-index", + action="store_true", + help="Write a root index that lists all available package names.", + ) p.add_argument("-f", "--force", action="store_true", help="Overwrite files.") p.add_argument("-v", "--verbose", action="store_true", help="Verbose output.") p.add_argument("-V", "--version", action="version", version=__version__) diff --git a/s3pypi/core.py b/s3pypi/core.py index db48323..d810da8 100644 --- a/s3pypi/core.py +++ b/s3pypi/core.py @@ -61,9 +61,8 @@ def upload_packages( filename = distr.local_path.name if not force and filename in index.filenames: - log.warning( - "%s already exists! (use --force to overwrite)", filename - ) + msg = "%s already exists! (use --force to overwrite)" + log.warning(msg, filename) else: log.info("Uploading %s", distr.local_path) storage.put_distribution(directory, distr.local_path) diff --git a/s3pypi/locking.py b/s3pypi/locking.py index cbc810a..6f5e60a 100644 --- a/s3pypi/locking.py +++ b/s3pypi/locking.py @@ -2,11 +2,16 @@ import datetime as dt import hashlib import json +import logging import time from contextlib import contextmanager import boto3 +from s3pypi import __prog__ + +log = logging.getLogger(__prog__) + class Locker(abc.ABC): @contextmanager @@ -63,6 +68,8 @@ def _lock(self, lock_id: str): ) return except self.exc.ConditionalCheckFailedException: + if attempt == 1: + log.info("Waiting to acquire lock... (%s)", lock_id) if attempt < self.max_attempts: time.sleep(self.poll_interval) @@ -77,7 +84,7 @@ class DynamoDBLockTimeoutError(Exception): def __init__(self, table: str, item: dict): key = json.dumps({"LockID": {"S": item["LockID"]}}) super().__init__( - f"Timed out trying to acquire lock:\n\n{json.dumps(item, indent=2)}\n\n" + f"Timed out trying to acquire lock:\n{json.dumps(item, indent=2)}\n\n" "Another instance of s3pypi may currently be holding the lock.\n" "If this is not the case, you may release the lock as follows:\n\n" f"$ aws dynamodb delete-item --table-name {table} --key '{key}'\n" From 4c2e1bbd3fab98f7f286ca46f6fb2384e1780756 Mon Sep 17 00:00:00 2001 From: Matteo De Wint Date: Thu, 20 May 2021 09:24:40 +0200 Subject: [PATCH 9/9] Locking; add integration tests --- tests/integration/conftest.py | 22 ++++++++++++++++++---- tests/integration/test_locking.py | 18 ++++++++++++++++++ tests/integration/test_main.py | 4 ++-- 3 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 tests/integration/test_locking.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e9b769b..8e6b65c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -2,8 +2,8 @@ from contextlib import contextmanager import boto3 +import moto import pytest -from moto import mock_s3 @pytest.fixture(scope="session") @@ -26,17 +26,31 @@ def aws_credentials(): os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" os.environ["AWS_SECURITY_TOKEN"] = "testing" os.environ["AWS_SESSION_TOKEN"] = "testing" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" @pytest.fixture def s3_bucket(aws_credentials): - with mock_s3(): - conn = boto3.resource("s3", region_name="us-east-1") - bucket = conn.Bucket("s3pypi-test") + with moto.mock_s3(): + s3 = boto3.resource("s3") + bucket = s3.Bucket("s3pypi-test") bucket.create() yield bucket +@pytest.fixture +def dynamodb_table(s3_bucket): + name = f"{s3_bucket.name}-locks" + with moto.mock_dynamodb2(), moto.mock_sts(): + db = boto3.resource("dynamodb") + db.create_table( + TableName=name, + AttributeDefinitions=[{"AttributeName": "LockID", "AttributeType": "S"}], + KeySchema=[{"AttributeName": "LockID", "KeyType": "HASH"}], + ) + yield db.Table(name) + + @pytest.fixture def boto3_session(s3_bucket): return boto3.session.Session() diff --git a/tests/integration/test_locking.py b/tests/integration/test_locking.py new file mode 100644 index 0000000..6ef5710 --- /dev/null +++ b/tests/integration/test_locking.py @@ -0,0 +1,18 @@ +import pytest + +from s3pypi.locking import DynamoDBLocker, DynamoDBLockTimeoutError + + +def test_dynamodb_lock_timeout(boto3_session, dynamodb_table): + lock = DynamoDBLocker( + boto3_session, + dynamodb_table.name, + poll_interval=0.01, + max_attempts=3, + ) + key = "example" + + with lock(key): + with pytest.raises(DynamoDBLockTimeoutError): + with lock(key): + pass diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py index feb08c5..c94e0e4 100644 --- a/tests/integration/test_main.py +++ b/tests/integration/test_main.py @@ -20,10 +20,10 @@ def test_string_dict(text, expected): assert string_dict(text) == expected -def test_main_upload_package(chdir, data_dir, s3_bucket): +def test_main_upload_package(chdir, data_dir, s3_bucket, dynamodb_table): with chdir(data_dir): dist = sorted(glob.glob("dists/*")) - s3pypi(*dist, "--bucket", s3_bucket.name, "--put-root-index") + s3pypi(*dist, "--bucket", s3_bucket.name, "--lock-indexes", "--put-root-index") def read(key: str) -> bytes: return s3_bucket.Object(key).get()["Body"].read()