Skip to content

Commit

Permalink
feat: auto-enable locking if table exists
Browse files Browse the repository at this point in the history
  • Loading branch information
mdwint committed Jan 1, 2024
1 parent cf2d38a commit 7f18ae9
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/)
### Added

- `s3pypi delete` command to delete packages from S3.
- `--locks-table` to customise the DynamoDB table name used for locking.

### Changed

Expand All @@ -20,6 +21,7 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/)
### Removed

- `--acl` option. Use `--s3-put-args='ACL=...'` instead. [The use of ACLs is discouraged].
- `--lock-indexes` option. Locking is enabled automatically if a DynamoDB table exists.

[The use of ACLs is discouraged]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html

Expand Down
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ s3pypi = "s3pypi.__main__:main"

[tool.poetry.dependencies]
boto3 = "^1.34.11"
boto3-stubs = {extras = ["s3"], version = "^1.34.11"}
boto3-stubs = {extras = ["dynamodb", "s3"], version = "^1.34.11"}
python = "^3.8"

[tool.poetry.group.dev.dependencies]
Expand Down
11 changes: 4 additions & 7 deletions s3pypi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ def build_s3_args(p: ArgumentParser) -> None:
),
)
p.add_argument(
"--lock-indexes",
action="store_true",
help=(
"Lock index objects in S3 using a DynamoDB table named `<bucket>-locks`. "
"This ensures that concurrent invocations of s3pypi do not overwrite each other's changes."
),
"--locks-table",
metavar="TABLE",
help="DynamoDB table to use for locking (default: `<bucket>-locks`).",
)


Expand Down Expand Up @@ -133,7 +130,7 @@ def main(*raw_args: str) -> None:
endpoint_url=args.s3_endpoint_url,
put_kwargs=args.s3_put_args,
index_html=args.index_html,
lock_indexes=args.lock_indexes,
locks_table=args.locks_table,
),
)

Expand Down
48 changes: 34 additions & 14 deletions s3pypi/locking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import logging
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Iterator

import boto3
from mypy_boto3_dynamodb.service_resource import Table

from s3pypi import __prog__, exceptions as exc

Expand Down Expand Up @@ -40,39 +42,57 @@ def _lock(self, lock_id: str) -> None:
_unlock = _lock


@dataclass
class LockerConfig:
retry_delay: int = 1
max_attempts: int = 10


class DynamoDBLocker(Locker):
def __init__(
self,
@staticmethod
def discover(
session: boto3.session.Session,
table: str,
retry_delay: int = 1,
max_attempts: int = 10,
):
table_name: str,
mandatory: bool = False,
cfg: LockerConfig = LockerConfig(),
) -> Locker:
db = session.resource("dynamodb")
self.table = db.Table(table)
table = db.Table(table_name)

if not mandatory:
try:
table.get_item(Key={"LockID": "?"})
except table.meta.client.exceptions.ClientError:
log.debug("No locks table found. Locking disabled.")
return DummyLocker()

owner = session.client("sts").get_caller_identity()["Arn"]
return DynamoDBLocker(table, owner, cfg)

def __init__(self, table: Table, owner: str, cfg: LockerConfig):
self.table = table
self.exc = self.table.meta.client.exceptions
self.retry_delay = retry_delay
self.max_attempts = max_attempts
self.caller_id = session.client("sts").get_caller_identity()["Arn"]
self.owner = owner
self.cfg = cfg

def _lock(self, lock_id: str) -> None:
for attempt in range(1, self.max_attempts + 1):
for attempt in range(1, self.cfg.max_attempts + 1):
now = dt.datetime.now(dt.timezone.utc)
try:
self.table.put_item(
Item={
"LockID": lock_id,
"AcquiredAt": now.isoformat(),
"Owner": self.caller_id,
"Owner": self.owner,
},
ConditionExpression="attribute_not_exists(LockID)",
)
return
except self.exc.ConditionalCheckFailedException:
if attempt == 1:
log.info("Waiting to acquire lock... (%s)", lock_id)
if attempt < self.max_attempts:
time.sleep(self.retry_delay)
if attempt < self.cfg.max_attempts:
time.sleep(self.cfg.retry_delay)

item = self.table.get_item(Key={"LockID": lock_id})["Item"]
raise DynamoDBLockTimeoutError(self.table.name, item)
Expand Down
12 changes: 6 additions & 6 deletions s3pypi/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from mypy_boto3_s3.service_resource import Object

from s3pypi.index import Index
from s3pypi.locking import DummyLocker, DynamoDBLocker
from s3pypi.locking import DynamoDBLocker


@dataclass
Expand All @@ -22,7 +22,7 @@ class S3Config:
endpoint_url: Optional[str] = None
put_kwargs: Dict[str, str] = field(default_factory=dict)
index_html: bool = False
lock_indexes: bool = False
locks_table: Optional[str] = None


class S3Storage:
Expand All @@ -40,10 +40,10 @@ def __init__(self, cfg: S3Config):
self.index_name = self._index if cfg.index_html else ""
self.cfg = cfg

self.lock = (
DynamoDBLocker(session, table=f"{cfg.bucket}-locks")
if cfg.lock_indexes
else DummyLocker()
self.lock = DynamoDBLocker.discover(
session,
table_name=cfg.locks_table or f"{cfg.bucket}-locks",
mandatory=bool(cfg.locks_table),
)

def _object(self, directory: str, filename: str) -> Object:
Expand Down
30 changes: 22 additions & 8 deletions tests/integration/test_locking.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
import pytest

from s3pypi.locking import DynamoDBLocker, DynamoDBLockTimeoutError
from s3pypi.locking import (
DummyLocker,
DynamoDBLocker,
DynamoDBLockTimeoutError,
LockerConfig,
)


def test_dynamodb_lock_timeout(boto3_session, dynamodb_table):
lock = DynamoDBLocker(
boto3_session,
dynamodb_table.name,
retry_delay=0,
max_attempts=3,
)
def test_dynamodb_discover_found(boto3_session, dynamodb_table):
lock = DynamoDBLocker.discover(boto3_session, dynamodb_table.name)

assert isinstance(lock, DynamoDBLocker)
assert lock.table == dynamodb_table


def test_dynamodb_discover_not_found(boto3_session):
lock = DynamoDBLocker.discover(boto3_session, table_name="does-not-exist")

assert isinstance(lock, DummyLocker)


def test_dynamodb_lock_timeout(dynamodb_table):
cfg = LockerConfig(retry_delay=0, max_attempts=3)
lock = DynamoDBLocker(dynamodb_table, owner="pytest", cfg=cfg)
key = "example"

with lock(key):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_string_dict(text, expected):

@pytest.mark.parametrize("prefix", ["", "packages", "packages/abc"])
def test_main_upload_package(chdir, data_dir, s3_bucket, dynamodb_table, prefix):
args = ["dists/*", "--bucket", s3_bucket.name, "--lock-indexes", "--put-root-index"]
args = ["dists/*", "--bucket", s3_bucket.name, "--put-root-index"]
if prefix:
args.extend(["--prefix", prefix])

Expand All @@ -45,7 +45,7 @@ def assert_pkg_exists(pkg: str, filename: str):
assert_pkg_exists("xyz", "xyz-0.1.0.zip")


def test_main_upload_package_exists(chdir, data_dir, s3_bucket, caplog):
def test_main_upload_package_exists(chdir, data_dir, s3_bucket, dynamodb_table, caplog):
dist = "dists/foo-0.1.0.tar.gz"

with chdir(data_dir):
Expand Down

0 comments on commit 7f18ae9

Please sign in to comment.