From d169aa79444f858decbfcb37b5110f442a19e190 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simen=20Heggest=C3=B8yl?= Date: Tue, 16 Jul 2024 08:40:35 +0200 Subject: [PATCH] Add job for deleting obsolete editions Add a batch job that deletes editions considered "obsolete". "Obsolete" editions are editions that fulfill both of the following criteria: 1. They are more than 90 days old. 2. The are past the third generation of editions (meaning that we always keep at least three editions of a dataset version). When an edition is deleted, any distributions (and their files) belonging to it is deleted as well. --- jobs/delete_obsolete_editions/README.md | 12 +++++ jobs/delete_obsolete_editions/__init__.py | 0 jobs/delete_obsolete_editions/editions.py | 49 +++++++++++++++++++ jobs/delete_obsolete_editions/handler.py | 28 +++++++++++ metadata/distribution/repository.py | 9 +++- metadata/edition/repository.py | 15 ++++-- serverless.yml | 11 +++++ .../jobs/delete_obsolete_editions/__init__.py | 0 .../jobs/delete_obsolete_editions/conftest.py | 32 ++++++++++++ .../delete_obsolete_editions/test_editions.py | 48 ++++++++++++++++++ 10 files changed, 199 insertions(+), 5 deletions(-) create mode 100644 jobs/delete_obsolete_editions/README.md create mode 100644 jobs/delete_obsolete_editions/__init__.py create mode 100644 jobs/delete_obsolete_editions/editions.py create mode 100644 jobs/delete_obsolete_editions/handler.py create mode 100644 tests/jobs/delete_obsolete_editions/__init__.py create mode 100644 tests/jobs/delete_obsolete_editions/conftest.py create mode 100644 tests/jobs/delete_obsolete_editions/test_editions.py diff --git a/jobs/delete_obsolete_editions/README.md b/jobs/delete_obsolete_editions/README.md new file mode 100644 index 0000000..4b7f891 --- /dev/null +++ b/jobs/delete_obsolete_editions/README.md @@ -0,0 +1,12 @@ +# Delete obsolete editions + +This batch job deletes editions considered "obsolete". + +"Obsolete" editions are editions that fulfill both of the following criteria: + +1. They are more than 90 days old. +2. The are past the third generation of editions (meaning that we always keep at +least three editions of a dataset version). + +When an edition is deleted, any distributions (and their files) belonging to it +is deleted as well. diff --git a/jobs/delete_obsolete_editions/__init__.py b/jobs/delete_obsolete_editions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jobs/delete_obsolete_editions/editions.py b/jobs/delete_obsolete_editions/editions.py new file mode 100644 index 0000000..6b1f124 --- /dev/null +++ b/jobs/delete_obsolete_editions/editions.py @@ -0,0 +1,49 @@ +from datetime import datetime, timedelta, timezone +from itertools import groupby + +from metadata.edition.repository import EditionRepository + +MIN_EDITION_AGE_DAYS = 90 +MIN_EDITIONS_TO_KEEP = 3 + + +def _edition_timestamp(edition): + """Return the timestamp part of `edition`.""" + return datetime.fromisoformat(edition["edition"]).astimezone(timezone.utc) + + +def _old_enough_to_delete(edition): + """Return true if `edition` is old enough to be automatically deleted.""" + return _edition_timestamp(edition) < datetime.now(timezone.utc) - timedelta( + days=MIN_EDITION_AGE_DAYS + ) + + +def _edition_dataset(edition): + """Return the dataset/version part of `edition`.""" + return "/".join(edition["Id"].split("/")[:2]) + + +def _prunable_editions(editions): + """Return the editions in `editions` that are prunable. + + "Prunable" being more than 90 days old, and past the third generation of + editions. + """ + return filter( + _old_enough_to_delete, + sorted(editions, key=_edition_timestamp, reverse=True)[MIN_EDITIONS_TO_KEEP:], + ) + + +def obsolete_editions(): + """Return a generator of obsolete editions.""" + edition_repository = EditionRepository() + editions = edition_repository.get_editions() + editions_by_dataset = groupby( + sorted(editions, key=_edition_dataset), _edition_dataset + ) + + for dataset, dataset_editions in editions_by_dataset: + for edition in _prunable_editions(dataset_editions): + yield edition diff --git a/jobs/delete_obsolete_editions/handler.py b/jobs/delete_obsolete_editions/handler.py new file mode 100644 index 0000000..74006ee --- /dev/null +++ b/jobs/delete_obsolete_editions/handler.py @@ -0,0 +1,28 @@ +import logging +import os +from operator import itemgetter + +from aws_xray_sdk.core import patch_all, xray_recorder +from okdata.aws.logging import log_add, logging_wrapper + +from jobs.delete_obsolete_editions.editions import obsolete_editions +from metadata.edition.repository import EditionRepository + +logger = logging.getLogger() +logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO)) + +patch_all() + + +@logging_wrapper +@xray_recorder.capture("handler") +def handler(event, context): + edition_repository = EditionRepository() + num_deleted_editions = 0 + + for edition_id in map(itemgetter("Id"), obsolete_editions()): + logger.info(f"Deleting edition {edition_id}") + edition_repository.delete_item(edition_id, cascade=True) + num_deleted_editions += 1 + + log_add(num_deleted_editions=num_deleted_editions) diff --git a/metadata/distribution/repository.py b/metadata/distribution/repository.py index b64c9de..b056986 100644 --- a/metadata/distribution/repository.py +++ b/metadata/distribution/repository.py @@ -107,18 +107,23 @@ def _delete_data(self, dataset_id, version, edition, distribution): bucket = getenv("DATA_BUCKET_NAME") dataset = DatasetRepository().get_dataset(dataset_id) + + if not dataset: + logger.warning(f"Unknown dataset '{dataset_id}'; skipping data deletion") + return + access_rights = dataset.get("accessRights") confidentiality = CONFIDENTIALITY_MAP.get(access_rights) filenames = distribution_.get("filenames") if not confidentiality: - logger.info( + logger.warning( f"Unknown confidentiality for dataset '{dataset_id}'; skipping data deletion" ) return if not filenames: - logger.info( + logger.warning( f"No filenames listed for distribution '{distribution_id}'; skipping data deletion" ) return diff --git a/metadata/edition/repository.py b/metadata/edition/repository.py index 3ad83f0..9a40272 100644 --- a/metadata/edition/repository.py +++ b/metadata/edition/repository.py @@ -29,9 +29,18 @@ def get_edition(self, dataset_id, version, edition, consistent_read=False): edition_id = f"{dataset_id}/{version}/{edition}" return self.get_item(edition_id, consistent_read) - def get_editions(self, dataset_id, version, exclude_latest=True): - version_id = f"{dataset_id}/{version}" - editions = self.get_items(version_id) + def get_editions(self, dataset_id=None, version=None, exclude_latest=True): + """Return editions belonging to `dataset_id`/`version`. + + If `dataset_id` and `version` aren't both provided, return every + edition instead. + + When `exclude_latest` is true, the `latest` edition is removed from the + results. + """ + editions = self.get_items( + f"{dataset_id}/{version}" if dataset_id and version else None + ) if exclude_latest: # Remove 'latest' edition diff --git a/serverless.yml b/serverless.yml index 28ab2e5..fb5cd70 100644 --- a/serverless.yml +++ b/serverless.yml @@ -125,6 +125,9 @@ functions: delete-distribution: ${file(serverless/functions/delete_distribution.yaml)} get_distributions: ${file(serverless/functions/get_distributions.yaml)} get_distribution: ${file(serverless/functions/get_distribution.yaml)} + # + # Jobs + # update-last-read: image: name: okdata-metadata-api @@ -133,3 +136,11 @@ functions: events: - schedule: cron(30 * * * ? *) timeout: 300 + delete-obsolete-editions: + image: + name: okdata-metadata-api + command: + - jobs.delete_obsolete_editions.handler.handler + events: + - schedule: cron(0 3 * * ? *) + timeout: 300 diff --git a/tests/jobs/delete_obsolete_editions/__init__.py b/tests/jobs/delete_obsolete_editions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/jobs/delete_obsolete_editions/conftest.py b/tests/jobs/delete_obsolete_editions/conftest.py new file mode 100644 index 0000000..7c87014 --- /dev/null +++ b/tests/jobs/delete_obsolete_editions/conftest.py @@ -0,0 +1,32 @@ +import pytest + + +@pytest.fixture() +def editions(): + return [ + { + "edition": "2023-01-01T08:00:00+00:00", + "Id": "foo/1/20230101T080000", + "Type": "Edition", + }, + { + "edition": "2023-01-05T08:00:00+00:00", + "Id": "foo/1/20230105T080000", + "Type": "Edition", + }, + { + "edition": "2023-01-10T08:00:00+00:00", + "Id": "foo/1/20230110T080000", + "Type": "Edition", + }, + { + "edition": "2023-01-15T08:00:00+00:00", + "Id": "foo/1/20230115T080000", + "Type": "Edition", + }, + { + "edition": "2023-01-20T08:00:00+00:00", + "Id": "foo/1/20230120T080000", + "Type": "Edition", + }, + ] diff --git a/tests/jobs/delete_obsolete_editions/test_editions.py b/tests/jobs/delete_obsolete_editions/test_editions.py new file mode 100644 index 0000000..3b37700 --- /dev/null +++ b/tests/jobs/delete_obsolete_editions/test_editions.py @@ -0,0 +1,48 @@ +from unittest.mock import patch +from datetime import datetime, timezone + +from freezegun import freeze_time + +from jobs.delete_obsolete_editions.editions import ( + _edition_dataset, + _edition_timestamp, + _old_enough_to_delete, + _prunable_editions, + obsolete_editions, +) + + +def test_edition_timestamp(editions): + assert _edition_timestamp(editions[0]) == datetime( + 2023, 1, 1, 8, 0, 0, 0, timezone.utc + ) + + +@freeze_time("2023-04-07") +def test_old_enough_to_delete(editions): + assert list(map(_old_enough_to_delete, editions)) == [True] * 2 + [False] * 3 + + +def test_edition_dataset(editions): + assert _edition_dataset(editions[0]) == "foo/1" + + +@freeze_time("2023-04-03") +def test_prunable_editions(editions): + assert len(list(_prunable_editions(editions))) == 1 + + +@freeze_time("2023-04-03") +@patch("jobs.delete_obsolete_editions.editions.EditionRepository") +def test_obsolete_editions_age(edition_repository, editions): + edition_repository.return_value.get_editions.return_value = editions + + assert len(list(obsolete_editions())) == 1 + + +@freeze_time("2024-01-01") +@patch("jobs.delete_obsolete_editions.editions.EditionRepository") +def test_obsolete_editions_number(edition_repository, editions): + edition_repository.return_value.get_editions.return_value = editions + + assert len(list(obsolete_editions())) == 2