Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optionally delete from S3 what was NOT uploaded #3117

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ jobs:
# XXX would be nice to validate here that $DEPLOYER_BUCKET_PREFIX is truthy
echo "DEPLOYER_BUCKET_PREFIX=$DEPLOYER_BUCKET_PREFIX"

poetry run deployer upload ../client/build
poetry run deployer upload --prune ../client/build
poetry run deployer update-lambda-functions ./aws-lambda
# TODO
# Execute command to tell the Dev CloudFront distribution to use the
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/stage-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ jobs:
# XXX would be nice to validate here that $DEPLOYER_BUCKET_PREFIX is truthy
echo "DEPLOYER_BUCKET_PREFIX=$DEPLOYER_BUCKET_PREFIX"

poetry run deployer upload ../client/build
poetry run deployer upload --prune ../client/build
poetry run deployer update-lambda-functions ./aws-lambda

# TODO: Depending on how long the upload takes, consider switching to
Expand Down
8 changes: 8 additions & 0 deletions deployer/src/deployer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def whatsdeployed(ctx, directory: Path, output: str):
show_default=True,
is_flag=True,
)
@click.option(
"--prune",
help="Delete keys that were not uploaded this time (including those that didn't "
"need to be uploaded)",
default=False,
show_default=True,
is_flag=True,
)
@click.argument("directory", type=click.Path(), callback=validate_directory)
@click.pass_context
def upload(ctx, directory: Path, **kwargs):
Expand Down
130 changes: 130 additions & 0 deletions deployer/src/deployer/upload.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import concurrent.futures
import datetime
import hashlib
import mimetypes
import re
Expand All @@ -10,6 +11,7 @@
import boto3
import click
from boto3.s3.transfer import S3TransferConfig
from dateutil.tz import UTC

from .constants import (
DEFAULT_CACHE_CONTROL,
Expand Down Expand Up @@ -49,6 +51,7 @@ class Totals:
uploaded_files: int = 0
uploaded_redirects: int = 0
uploaded_files_size: int = 0
deleted_files: int = 0

def count(self, task):
if task.skipped:
Expand All @@ -57,6 +60,8 @@ def count(self, task):
self.failed += 1
elif task.is_redirect:
self.uploaded_redirects += 1
elif task.is_deletion:
self.deleted_files += 1
else:
self.uploaded_files += 1
self.uploaded_files_size += task.size
Expand Down Expand Up @@ -109,6 +114,7 @@ class UploadTask:
error = None
skipped = False
is_redirect = False
is_deletion = False

def upload(self):
raise NotImplementedError()
Expand Down Expand Up @@ -249,6 +255,31 @@ def upload(self, bucket_manager):
)


class DeleteTask(UploadTask):
"""
Class for doing deletion by key tasks.
"""

is_deletion = True

def __init__(self, key, dry_run=False):
self.key = key
self.dry_run = dry_run

def __repr__(self):
return f"{self.__class__.__name__}({self.key})"

def __str__(self):
return self.key

def delete(self, bucket_manager):
if not self.dry_run:
bucket_manager.client.delete_object(
Key=str(self.key),
Bucket=bucket_manager.bucket_name,
)


class BucketManager:
def __init__(self, bucket_name, bucket_prefix):
self.bucket_name = bucket_name
Expand Down Expand Up @@ -290,6 +321,15 @@ def get_bucket_objects(self):
result = {}
continuation_token = None
while True:
# Note! You can set a `MaxKeys` parameter here.
# The default is 1,000. Any number larger than 1,000 is ignored
# and it will just fall back to 1,000.
# (Peterbe's note) I've experimented with different numbers (
# e.g. 500 or 100) and the total time difference is insignificant.
# A large MaxKeys means larger batches and fewer network requests
# which has a reduced risk of network failures (automatically retried)
# and there doesn't appear to be any benefit in setting it to a lower
# number. So leave it at 1,000 which is what you get when it's not set.
kwargs = dict(Bucket=self.bucket_name)
if self.key_prefix:
kwargs["Prefix"] = self.key_prefix
Expand Down Expand Up @@ -364,6 +404,10 @@ def iter_redirect_tasks(
dry_run=dry_run,
)

def iter_delete_tasks(self, keys, dry_run=False):
for key in keys:
yield DeleteTask(key, dry_run=dry_run)

def count_file_tasks(self, build_directory):
return sum(self.iter_file_tasks(build_directory, for_counting_only=True))

Expand Down Expand Up @@ -400,7 +444,19 @@ def upload(
task.skipped = True
if on_task_complete:
on_task_complete(task)

# Before continuing, pop it from the existing dict because
# we no longer need it after the ETag comparison has been
# done.
existing_bucket_objects.pop(task.key, None)
continue

if existing_bucket_objects:
# Independent of if we benefitted from the knowledge of the
# key already existing or not, remove it from the dict
# so we can figure out what remains later.
existing_bucket_objects.pop(task.key, None)

future = executor.submit(task.upload, self)
futures[future] = task

Expand All @@ -416,6 +472,31 @@ def upload(

return timer

def delete(self, keys, on_task_complete=None, dry_run=False):
"""Delete doesn't care if it's a redirect or a regular file."""
with concurrent.futures.ThreadPoolExecutor(
max_workers=MAX_WORKERS_PARALLEL_UPLOADS
) as executor, StopWatch() as timer:
# Upload the redirects first, then the built files. This
# ensures that a built file overrides its stale redirect.
task_iter = self.iter_delete_tasks(keys, dry_run=dry_run)
futures = {}
for task in task_iter:
future = executor.submit(task.delete, self)
futures[future] = task

for future in concurrent.futures.as_completed(futures):
task = futures[future]
try:
task.error = future.exception()
except concurrent.futures.CancelledError as cancelled:
task.error = cancelled

if on_task_complete:
on_task_complete(task)

return timer


def upload_content(build_directory, content_roots, config):
full_timer = StopWatch().start()
Expand All @@ -426,6 +507,7 @@ def upload_content(build_directory, content_roots, config):
force_refresh = config["force_refresh"]
show_progress_bar = not config["no_progressbar"]
upload_redirects = not config["no_redirects"]
prune = config["prune"]

log.info(f"Upload files from: {build_directory}")
if upload_redirects:
Expand Down Expand Up @@ -492,6 +574,54 @@ def on_task_complete(task):
log.info(f"Total uploaded redirects: {totals.uploaded_redirects:,} ")
log.info(f"Total skipped files: {totals.skipped:,} matched existing S3 objects")
log.info(f"Total upload/skip time: {upload_timer}")

if prune:
# Now `existing_bucket_objects` has mutated to only contain the keys
# that were not uploaded or not needed to be uploaded.
# That basically means all the S3 keys that exist before but are
# unrecognized now. For example, things that were once built but are
# now deleted.
now = datetime.datetime.utcnow().replace(tzinfo=UTC)
delete_keys = []
for key in existing_bucket_objects:
if key.startswith(f"{bucket_prefix}/_whatsdeployed/"):
# These are special and wouldn't have been uploaded
continue

if key.startswith(f"{bucket_prefix}/static/"):
# Careful with these!
# Static assets such as `main/static/js/8.0b83949c.chunk.js`
# are aggressively cached and they might still be referenced
# from within HTML pages that are still in the CDN cache.
# Suppose someone gets a copy of yesterday's HTML from the CDN
# and it refers to `/static/js/foo.abc123.js` which is not in their
# browser cache or the CDN's cache, what might happen is that
# their browser requests it even though
# `/static/js/foo.def456.js` is now the latest and greatest.
# To be safe, only delete if it's considered "old".
delta = now - existing_bucket_objects[key]["LastModified"]
if delta.days < 30:
continue

assert key.startswith(bucket_prefix)

delete_keys.append(key)

log.info(f"Total pending task deletions: {len(delete_keys):,}")

with DisplayProgress(len(delete_keys), show_progress_bar) as progress:

def on_task_complete(task):
progress.update(task)
totals.count(task)

mgr.delete(delete_keys, on_task_complete=on_task_complete, dry_run=dry_run)

if dry_run:
log.info("No deletions. Dry run!")
else:
log.info(f"Total deleted keys: {totals.deleted_files:,}")

log.info(f"Done in {full_timer.stop()}.")

if totals.failed:
Expand Down