Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to lock indexes in S3 using DynamoDB & add option to put root index #80

Merged
merged 9 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/).


## Unreleased

### Added

- Terraform config for an optional DynamoDB table used for distributed locking.
- `--lock-indexes` option to lock index objects in S3 using said DynamoDB table.
- `--put-root-index` option to write a root index that lists all package names.

### Changed

- Set CloudFront default root object to `index.html`.


## 1.0.0rc1 - 2021-05-19

### Added
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ your domain, with a matching (wildcard) certificate in [AWS Certificate
Manager]. If your certificate is a wildcard certificate, add
`use_wildcard_certificate = true` to `config.auto.tfvars`.

#### Distributed locking with DynamoDB

To ensure that concurrent invocations of `s3pypi` do not overwrite each other's
changes, the objects in S3 can be locked via an optional DynamoDB table (using
the `--lock-indexes` option). To create this table, add `enable_dynamodb_locking
= true` to `config.auto.tfvars`.

#### Basic authentication

To enable basic authentication, add `enable_basic_auth = true` to
Expand Down Expand Up @@ -94,6 +101,7 @@ module "s3pypi" {
domain = "pypi.example.com"

use_wildcard_certificate = true
enable_dynamodb_locking = true
enable_basic_auth = true

providers = {
Expand Down
14 changes: 14 additions & 0 deletions s3pypi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ def get_arg_parser():
"It's recommended to instead use a private S3 bucket with a CloudFront Origin Access Identity."
),
)
p.add_argument(
"-l",
"--lock-indexes",
action="store_true",
help=(
"Lock index objects in S3 using a DynamoDB table named `<bucket>-locks`. "
"This ensures that concurrent invocations of s3pypi do not overwrite each other's changes."
),
)
p.add_argument(
"--put-root-index",
action="store_true",
help="Write a root index that lists all available package names.",
)
p.add_argument("-f", "--force", action="store_true", help="Overwrite files.")
p.add_argument("-v", "--verbose", action="store_true", help="Verbose output.")
p.add_argument("-V", "--version", action="version", version=__version__)
Expand Down
57 changes: 41 additions & 16 deletions s3pypi/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
from itertools import groupby
from operator import attrgetter
from pathlib import Path
from typing import List
from typing import List, Optional
from zipfile import ZipFile

import boto3

from s3pypi import __prog__
from s3pypi.exceptions import S3PyPiError
from s3pypi.locking import DummyLocker, DynamoDBLocker
from s3pypi.storage import S3Storage

log = logging.getLogger(__prog__)
Expand All @@ -28,27 +31,49 @@ def normalize_package_name(name: str) -> str:
return re.sub(r"[-_.]+", "-", name.lower())


def upload_packages(dist: List[Path], bucket: str, force: bool = False, **kwargs):
storage = S3Storage(bucket, **kwargs)
def upload_packages(
dist: List[Path],
bucket: str,
force: bool = False,
lock_indexes: bool = False,
put_root_index: bool = False,
profile: Optional[str] = None,
region: Optional[str] = None,
**kwargs,
):
session = boto3.Session(profile_name=profile, region_name=region)
storage = S3Storage(session, bucket, **kwargs)
lock = (
DynamoDBLocker(session, table=f"{bucket}-locks")
if lock_indexes
else DummyLocker()
)

distributions = [parse_distribution(path) for path in dist]
get_name = attrgetter("name")

for name, group in groupby(sorted(distributions, key=get_name), get_name):
directory = normalize_package_name(name)
index = storage.get_index(directory)

for distr in group:
filename = distr.local_path.name

if not force and filename in index.filenames:
log.warning("%s already exists! (use --force to overwrite)", filename)
else:
log.info("Uploading %s", distr.local_path)
storage.put_distribution(directory, distr.local_path)
index.filenames.add(filename)

storage.put_index(directory, index)
with lock(directory):
index = storage.get_index(directory)

for distr in group:
filename = distr.local_path.name

if not force and filename in index.filenames:
msg = "%s already exists! (use --force to overwrite)"
log.warning(msg, filename)
else:
log.info("Uploading %s", distr.local_path)
storage.put_distribution(directory, distr.local_path)
index.filenames.add(filename)

storage.put_index(directory, index)

if put_root_index:
with lock(storage.root):
index = storage.build_root_index()
storage.put_index(storage.root, index)


def parse_distribution(path: Path) -> Distribution:
Expand Down
2 changes: 1 addition & 1 deletion s3pypi/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def parse(cls, html: str) -> "Index":

def to_html(self) -> str:
links = "<br>\n".join(
f'<a href="{urllib.parse.quote(fname)}">{fname}</a>'
f'<a href="{urllib.parse.quote(fname)}">{fname.rstrip("/")}</a>'
for fname in sorted(self.filenames)
)
return index_html.format(body=indent(links, " " * 4))
Expand Down
91 changes: 91 additions & 0 deletions s3pypi/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import abc
import datetime as dt
import hashlib
import json
import logging
import time
from contextlib import contextmanager

import boto3

from s3pypi import __prog__

log = logging.getLogger(__prog__)


class Locker(abc.ABC):
@contextmanager
def __call__(self, key: str):
lock_id = hashlib.sha1(key.encode()).hexdigest()
self._lock(lock_id)
try:
yield
finally:
self._unlock(lock_id)

@abc.abstractmethod
def _lock(self, lock_id: str):
...

@abc.abstractmethod
def _unlock(self, lock_id: str):
...


class DummyLocker(Locker):
def _lock(self, lock_id: str):
pass

_unlock = _lock


class DynamoDBLocker(Locker):
def __init__(
self,
session: boto3.session.Session,
table: str,
poll_interval: int = 1,
max_attempts: int = 10,
):
db = session.resource("dynamodb")
self.table = db.Table(table)
self.exc = self.table.meta.client.exceptions
self.poll_interval = poll_interval
self.max_attempts = max_attempts
self.caller_id = session.client("sts").get_caller_identity()["Arn"]

def _lock(self, lock_id: str):
for attempt in range(1, self.max_attempts + 1):
now = dt.datetime.now(dt.timezone.utc)
try:
self.table.put_item(
Item={
"LockID": lock_id,
"AcquiredAt": now.isoformat(),
"Owner": self.caller_id,
},
ConditionExpression="attribute_not_exists(LockID)",
)
return
except self.exc.ConditionalCheckFailedException:
if attempt == 1:
log.info("Waiting to acquire lock... (%s)", lock_id)
if attempt < self.max_attempts:
time.sleep(self.poll_interval)

item = self.table.get_item(Key={"LockID": lock_id})["Item"]
raise DynamoDBLockTimeoutError(self.table.name, item)

def _unlock(self, lock_id: str):
self.table.delete_item(Key={"LockID": lock_id})


class DynamoDBLockTimeoutError(Exception):
def __init__(self, table: str, item: dict):
key = json.dumps({"LockID": {"S": item["LockID"]}})
super().__init__(
f"Timed out trying to acquire lock:\n{json.dumps(item, indent=2)}\n\n"
"Another instance of s3pypi may currently be holding the lock.\n"
"If this is not the case, you may release the lock as follows:\n\n"
f"$ aws dynamodb delete-item --table-name {table} --key '{key}'\n"
)
24 changes: 19 additions & 5 deletions s3pypi/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,34 @@


class S3Storage:
root = "/"
_index = "index.html"

def __init__(
self,
session: boto3.session.Session,
bucket: str,
profile: Optional[str] = None,
region: Optional[str] = None,
prefix: Optional[str] = None,
acl: Optional[str] = None,
s3_put_args: Optional[dict] = None,
unsafe_s3_website: bool = False,
):
session = boto3.Session(profile_name=profile, region_name=region)
self.s3 = session.resource("s3")
self.bucket = bucket
self.prefix = prefix
self.index_name = "index.html" if unsafe_s3_website else ""
self.index_name = self._index if unsafe_s3_website else ""
self.put_kwargs = dict(
ACL=acl or "private",
**(s3_put_args or {}),
)

def _object(self, directory: str, filename: str):
parts = [directory, filename]
if parts == [self.root, self.index_name]:
parts = [self._index]
if self.prefix:
parts.insert(0, self.prefix)
return self.s3.Object(self.bucket, "/".join(parts))
return self.s3.Object(self.bucket, key="/".join(parts))

def get_index(self, directory: str) -> Index:
try:
Expand All @@ -41,6 +44,17 @@ def get_index(self, directory: str) -> Index:
return Index()
return Index.parse(html.decode())

def build_root_index(self) -> Index:
paginator = self.s3.meta.client.get_paginator("list_objects_v2")
result = paginator.paginate(
Bucket=self.bucket,
Prefix=self.prefix or "",
Delimiter="/",
)
n = len(self.prefix) + 1 if self.prefix else 0
dirs = set(p.get("Prefix")[n:] for p in result.search("CommonPrefixes"))
return Index(dirs)

def put_index(self, directory: str, index: Index):
self._object(directory, self.index_name).put(
Body=index.to_html(),
Expand Down
21 changes: 21 additions & 0 deletions terraform/modules/s3pypi/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ variable "use_wildcard_certificate" {
description = "Use a wildcard certificate (*.example.com)"
}

variable "enable_dynamodb_locking" {
type = bool
default = false
description = "Create a DynamoDB table for locking"
}

variable "enable_basic_auth" {
type = bool
default = false
Expand Down Expand Up @@ -67,6 +73,8 @@ resource "aws_cloudfront_distribution" "cdn" {
error_caching_min_ttl = 0
}

default_root_object = "index.html"

default_cache_behavior {
target_origin_id = "s3"
viewer_protocol_policy = "redirect-to-https"
Expand Down Expand Up @@ -141,6 +149,19 @@ data "aws_iam_policy_document" "s3_policy" {
}
}

resource "aws_dynamodb_table" "locks" {
count = var.enable_dynamodb_locking ? 1 : 0

name = "${var.bucket}-locks"
billing_mode = "PAY_PER_REQUEST"
hash_key = "LockID"

attribute {
name = "LockID"
type = "S"
}
}

module "basic_auth" {
count = var.enable_basic_auth ? 1 : 0

Expand Down
27 changes: 23 additions & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from contextlib import contextmanager

import boto3
import moto
import pytest
from moto import mock_s3


@pytest.fixture(scope="session")
Expand All @@ -26,12 +26,31 @@ def aws_credentials():
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture
def s3_bucket(aws_credentials):
with mock_s3():
conn = boto3.resource("s3", region_name="us-east-1")
bucket = conn.Bucket("s3pypi-test")
with moto.mock_s3():
s3 = boto3.resource("s3")
bucket = s3.Bucket("s3pypi-test")
bucket.create()
yield bucket


@pytest.fixture
def dynamodb_table(s3_bucket):
name = f"{s3_bucket.name}-locks"
with moto.mock_dynamodb2(), moto.mock_sts():
db = boto3.resource("dynamodb")
db.create_table(
TableName=name,
AttributeDefinitions=[{"AttributeName": "LockID", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "LockID", "KeyType": "HASH"}],
)
yield db.Table(name)


@pytest.fixture
def boto3_session(s3_bucket):
return boto3.session.Session()
Loading