Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job for deleting obsolete editions #91

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions jobs/delete_obsolete_editions/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Delete obsolete editions

This batch job deletes editions considered "obsolete".

"Obsolete" editions are editions that fulfill both of the following criteria:

1. They are more than 90 days old.
2. The are past the third generation of editions (meaning that we always keep at
least three editions of a dataset version).

When an edition is deleted, any distributions (and their files) belonging to it
is deleted as well.
Empty file.
49 changes: 49 additions & 0 deletions jobs/delete_obsolete_editions/editions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import datetime, timedelta, timezone
from itertools import groupby

from metadata.edition.repository import EditionRepository

MIN_EDITION_AGE_DAYS = 90
MIN_EDITIONS_TO_KEEP = 3


def _edition_timestamp(edition):
"""Return the timestamp part of `edition`."""
return datetime.fromisoformat(edition["edition"]).astimezone(timezone.utc)


def _old_enough_to_delete(edition):
"""Return true if `edition` is old enough to be automatically deleted."""
return _edition_timestamp(edition) < datetime.now(timezone.utc) - timedelta(
days=MIN_EDITION_AGE_DAYS
)


def _edition_dataset(edition):
"""Return the dataset/version part of `edition`."""
return "/".join(edition["Id"].split("/")[:2])


def _prunable_editions(editions):
"""Return the editions in `editions` that are prunable.

"Prunable" being more than 90 days old, and past the third generation of
editions.
"""
return filter(
_old_enough_to_delete,
sorted(editions, key=_edition_timestamp, reverse=True)[MIN_EDITIONS_TO_KEEP:],
)


def obsolete_editions():
"""Return a generator of obsolete editions."""
edition_repository = EditionRepository()
editions = edition_repository.get_editions()
editions_by_dataset = groupby(
sorted(editions, key=_edition_dataset), _edition_dataset
)

for dataset, dataset_editions in editions_by_dataset:
for edition in _prunable_editions(dataset_editions):
yield edition
28 changes: 28 additions & 0 deletions jobs/delete_obsolete_editions/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
import os
from operator import itemgetter

from aws_xray_sdk.core import patch_all, xray_recorder
from okdata.aws.logging import log_add, logging_wrapper

from jobs.delete_obsolete_editions.editions import obsolete_editions
from metadata.edition.repository import EditionRepository

logger = logging.getLogger()
logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO))

patch_all()


@logging_wrapper
@xray_recorder.capture("handler")
def handler(event, context):
edition_repository = EditionRepository()
num_deleted_editions = 0

for edition_id in map(itemgetter("Id"), obsolete_editions()):
logger.info(f"Deleting edition {edition_id}")
edition_repository.delete_item(edition_id, cascade=True)
num_deleted_editions += 1

log_add(num_deleted_editions=num_deleted_editions)
9 changes: 7 additions & 2 deletions metadata/distribution/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,23 @@ def _delete_data(self, dataset_id, version, edition, distribution):

bucket = getenv("DATA_BUCKET_NAME")
dataset = DatasetRepository().get_dataset(dataset_id)

if not dataset:
logger.warning(f"Unknown dataset '{dataset_id}'; skipping data deletion")
return

access_rights = dataset.get("accessRights")
confidentiality = CONFIDENTIALITY_MAP.get(access_rights)
filenames = distribution_.get("filenames")

if not confidentiality:
logger.info(
logger.warning(
f"Unknown confidentiality for dataset '{dataset_id}'; skipping data deletion"
)
return

if not filenames:
logger.info(
logger.warning(
f"No filenames listed for distribution '{distribution_id}'; skipping data deletion"
)
return
Expand Down
15 changes: 12 additions & 3 deletions metadata/edition/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ def get_edition(self, dataset_id, version, edition, consistent_read=False):
edition_id = f"{dataset_id}/{version}/{edition}"
return self.get_item(edition_id, consistent_read)

def get_editions(self, dataset_id, version, exclude_latest=True):
version_id = f"{dataset_id}/{version}"
editions = self.get_items(version_id)
def get_editions(self, dataset_id=None, version=None, exclude_latest=True):
"""Return editions belonging to `dataset_id`/`version`.

If `dataset_id` and `version` aren't both provided, return every
edition instead.

When `exclude_latest` is true, the `latest` edition is removed from the
results.
"""
editions = self.get_items(
f"{dataset_id}/{version}" if dataset_id and version else None
)

if exclude_latest:
# Remove 'latest' edition
Expand Down
11 changes: 11 additions & 0 deletions serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ functions:
delete-distribution: ${file(serverless/functions/delete_distribution.yaml)}
get_distributions: ${file(serverless/functions/get_distributions.yaml)}
get_distribution: ${file(serverless/functions/get_distribution.yaml)}
#
# Jobs
#
update-last-read:
image:
name: okdata-metadata-api
Expand All @@ -133,3 +136,11 @@ functions:
events:
- schedule: cron(30 * * * ? *)
timeout: 300
delete-obsolete-editions:
image:
name: okdata-metadata-api
command:
- jobs.delete_obsolete_editions.handler.handler
events:
- schedule: cron(0 3 * * ? *)
timeout: 300
Empty file.
32 changes: 32 additions & 0 deletions tests/jobs/delete_obsolete_editions/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest


@pytest.fixture()
def editions():
return [
{
"edition": "2023-01-01T08:00:00+00:00",
"Id": "foo/1/20230101T080000",
"Type": "Edition",
},
{
"edition": "2023-01-05T08:00:00+00:00",
"Id": "foo/1/20230105T080000",
"Type": "Edition",
},
{
"edition": "2023-01-10T08:00:00+00:00",
"Id": "foo/1/20230110T080000",
"Type": "Edition",
},
{
"edition": "2023-01-15T08:00:00+00:00",
"Id": "foo/1/20230115T080000",
"Type": "Edition",
},
{
"edition": "2023-01-20T08:00:00+00:00",
"Id": "foo/1/20230120T080000",
"Type": "Edition",
},
]
48 changes: 48 additions & 0 deletions tests/jobs/delete_obsolete_editions/test_editions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from unittest.mock import patch
from datetime import datetime, timezone

from freezegun import freeze_time

from jobs.delete_obsolete_editions.editions import (
_edition_dataset,
_edition_timestamp,
_old_enough_to_delete,
_prunable_editions,
obsolete_editions,
)


def test_edition_timestamp(editions):
assert _edition_timestamp(editions[0]) == datetime(
2023, 1, 1, 8, 0, 0, 0, timezone.utc
)


@freeze_time("2023-04-07")
def test_old_enough_to_delete(editions):
assert list(map(_old_enough_to_delete, editions)) == [True] * 2 + [False] * 3


def test_edition_dataset(editions):
assert _edition_dataset(editions[0]) == "foo/1"


@freeze_time("2023-04-03")
def test_prunable_editions(editions):
assert len(list(_prunable_editions(editions))) == 1


@freeze_time("2023-04-03")
@patch("jobs.delete_obsolete_editions.editions.EditionRepository")
def test_obsolete_editions_age(edition_repository, editions):
edition_repository.return_value.get_editions.return_value = editions

assert len(list(obsolete_editions())) == 1


@freeze_time("2024-01-01")
@patch("jobs.delete_obsolete_editions.editions.EditionRepository")
def test_obsolete_editions_number(edition_repository, editions):
edition_repository.return_value.get_editions.return_value = editions

assert len(list(obsolete_editions())) == 2
Loading