Skip to content

Commit

Permalink
Locking; implement lock and unlock methods
Browse files Browse the repository at this point in the history
  • Loading branch information
mdwint committed May 19, 2021
1 parent 807b725 commit 06b60f5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
11 changes: 4 additions & 7 deletions s3pypi/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import boto3

from s3pypi import __prog__, locking
from s3pypi import __prog__
from s3pypi.exceptions import S3PyPiError
from s3pypi.locking import DummyLocker, DynamoDBLocker
from s3pypi.storage import S3Storage

log = logging.getLogger(__prog__)
Expand Down Expand Up @@ -41,12 +42,8 @@ def upload_packages(
):
session = boto3.Session(profile_name=profile, region_name=region)
storage = S3Storage(session, bucket, **kwargs)
lock = (
# TODO: Make table name customizable?
locking.DynamoDbLocker(session, table=bucket)
if lock_indexes
else locking.DummyLocker()
)
# TODO: Make table name customizable?
lock = DynamoDBLocker(session, table=bucket) if lock_indexes else DummyLocker()

distributions = [parse_distribution(path) for path in dist]
get_name = attrgetter("name")
Expand Down
54 changes: 41 additions & 13 deletions s3pypi/locking.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,75 @@
import abc
import datetime as dt
import getpass
import time
from contextlib import contextmanager

import boto3


class LockTimeoutError(Exception):
def __init__(self, lock_id: str):
super().__init__(f"Timed out trying to acquire lock '{lock_id}'")


class Locker(abc.ABC):
@contextmanager
def __call__(self, key: str):
self._lock(key)
def __call__(self, lock_id: str):
self._lock(lock_id)
try:
yield
finally:
self._unlock(key)
self._unlock(lock_id)

@abc.abstractmethod
def _lock(self, key: str):
def _lock(self, lock_id: str):
...

@abc.abstractmethod
def _unlock(self, key: str):
def _unlock(self, lock_id: str):
...


class DummyLocker(Locker):
def _lock(self, key: str):
def _lock(self, lock_id: str):
pass

_unlock = _lock


class DynamoDbLocker(Locker):
class DynamoDBLocker(Locker):
def __init__(
self,
session: boto3.session.Session,
table: str,
timeout: int = 10,
poll_interval: int = 1,
max_attempts: int = 30,
):
db = session.resource("dynamodb")
self.table = db.Table(table)
self.timeout = timeout
self.exc = self.table.meta.client.exceptions
self.poll_interval = poll_interval
self.max_attempts = max_attempts
self.user = getpass.getuser()

def _lock(self, lock_id: str):
for attempt in range(1, self.max_attempts + 1):
now = dt.datetime.now(dt.timezone.utc)
try:
self.table.put_item(
Item={
"LockID": lock_id,
"AcquiredAt": now.isoformat(),
"Username": self.user,
},
ConditionExpression="attribute_not_exists(LockID)",
)
return
except self.exc.ConditionalCheckFailedException:
if attempt < self.max_attempts:
time.sleep(self.poll_interval)

def _lock(self, key: str):
raise NotImplementedError
raise LockTimeoutError(lock_id)

def _unlock(self, key: str):
raise NotImplementedError
def _unlock(self, lock_id: str):
self.table.delete_item(Item={"LockID": lock_id})

0 comments on commit 06b60f5

Please sign in to comment.