diff --git a/.gitignore b/.gitignore index 68bc17f..2dc53ca 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,4 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ diff --git a/.harbor_cleanup_policy.example.yaml b/.harbor_cleanup_policy.example.yaml new file mode 100644 index 0000000..5645a30 --- /dev/null +++ b/.harbor_cleanup_policy.example.yaml @@ -0,0 +1,16 @@ +policies: + - name: Policies for cleanup docker images + rules: + - rule: SaveLastNProdTags + regexp: '^v\d+\.\d+\.\d+$' + limit: 2 + - rule: SaveLastNStagingTags + regexp: '^v\d+\.\d+\.\d+.+' + limit: 2 + - rule: DeleteOlderThan + days: 3 + - rule: SaveLastNFeatureTags + limit: 3 + - rule: IgnoreTags + tags: + - "feature-branch" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7d6a7bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +PyYAML==6.0 +docker-registry-client==0.5.2 +requests==2.29.0 +jsonpath-rw==1.4.0 +pytest==7.3.1 \ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..cff72e1 --- /dev/null +++ b/src/config.py @@ -0,0 +1,91 @@ +import yaml +import os + +# Default policies +DEFAULT_POLICIES = [{ + 'name': 'Default Policy', + 'rules': [ + {'rule': 'DeleteOlderThan', 'days': 7}, + {'rule': 'SaveLastNProdTags', 'regexp': r'^v\d+\.\d+\.\d+$', 'limit': 5}, + {'rule': 'SaveLastNStagingTags', 'regexp': r'^v\d+\.\d+\.\d+.+', 'limit': 5}, + {'rule': 'SaveLastNFeatureTags', 'limit': 10}, + {'rule': 'IgnoreTags', 'tags': []} + ] +}] + + +def load_cleanup_policy(): + """ + Load Harbor cleanup policy from .harbor_cleanup_policy.yaml file or return default policies + """ + # Check if .harbor_cleanup_policy.yaml file exists + if os.path.exists(".harbor_cleanup_policy.yaml"): + with open(".harbor_cleanup_policy.yaml", "r") as f: + cleanup_policy = yaml.safe_load(f) + # Check if policies key exists in cleanup_policy dict + if "policies" in cleanup_policy: + return cleanup_policy["policies"] + # Return default policies if file doesn't exist or policies key is missing + return DEFAULT_POLICIES + + +def validate_rule(rule): + """ + Validate that rule has the required fields + """ + if "rule" not in rule: + raise ValueError("Missing 'rule' field in rule") + if rule["rule"] not in ["SaveLastNTags", "DeleteOlderThan", "SaveLastNProdTags", "SaveLastNStagingTags", + "SaveLastNFeatureTags", "IgnoreTags"]: + raise ValueError(f"The rule {rule['rule']} is wrong") + if rule["rule"] == "DeleteOlderThan" and "days" not in rule: + raise ValueError("Missing 'days' field in rule") + if rule["rule"] == "SaveLastNProdTags" and "regexp" not in rule: + raise ValueError("Missing 'regexp' field in rule") + if rule["rule"] == "SaveLastNProdTags" and "limit" not in rule: + rule["limit"] = next( + (d['limit'] for d in DEFAULT_POLICIES[0]["rules"] if 'SaveLastNProdTags' in d.get('rule', ''))) + if rule["rule"] == "SaveLastNProdTags" and rule["limit"] < 1: + raise ValueError("Missing 'limit' field in rule should be more then 1") + if rule["rule"] == "SaveLastNStagingTags" and "regexp" not in rule: + raise ValueError("Missing 'regexp' field in rule") + if rule["rule"] == "SaveLastNStagingTags" and "limit" not in rule: + rule["limit"] = next( + (d['limit'] for d in DEFAULT_POLICIES[0]["rules"] if 'SaveLastNStagingTags' in d.get('rule', ''))) + if rule["rule"] == "SaveLastNStagingTags" and rule["limit"] < 1: + raise ValueError("Missing 'limit' field in rule should be more then 1") + if rule["rule"] == "SaveLastNFeatureTags" and "limit" not in rule: + raise ValueError("Missing 'limit' field in rule") + if rule["rule"] == "IgnoreTags" and "tags" not in rule: + raise ValueError("Missing 'tags' field in rule") + if "limit" in rule and not isinstance(rule["limit"], int): + raise ValueError("'limit' field in rule must be an integer") + if "days" in rule and not isinstance(rule["days"], int): + raise ValueError("'days' field in rule must be an integer") + + +def validate_policy(policy): + """ + Validate that policy has the required fields + """ + if "name" not in policy: + raise ValueError("Missing 'name' field in policy") + if "rules" not in policy: + raise ValueError("Missing 'rules' field in policy") + for rule in policy["rules"]: + validate_rule(rule) + + +def merge_policies(policies): + # Merge policies with default policies + for policy in policies: + for default_policy in DEFAULT_POLICIES[0]['rules']: + if default_policy['rule'] not in [p['rule'] for p in policy['rules']]: + policy['rules'].append(default_policy) + + return policies[0] + + +def get_field_from_rule(policy, rule, field): + return next( + (d[f'{field}'] for d in policy["rules"] if rule in d.get('rule', ''))) diff --git a/src/harbor_client.py b/src/harbor_client.py new file mode 100644 index 0000000..801e6bb --- /dev/null +++ b/src/harbor_client.py @@ -0,0 +1,79 @@ +import logging + +import requests + +logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') +logger = logging.getLogger('logger') + + +class HarborClient: + HEADERS = {'Content-Type': 'application/json', 'accept': 'application/json'} + + def __init__(self, harbor_url, project_name, username, password, ssl_verify=False): + self._harbor_url = harbor_url + self._project_name = project_name + self._username = username + self._password = password + self._verify = ssl_verify + + def _get_data_from_response(self, resp): + if resp.status_code == 200: + return resp.json() + else: + logger.error(f"ERROR: Not found. {resp.status_code}") + exit(1) + + def _get_response(self, url): + responces = [] + first_page = requests.get(url, headers=HarborClient.HEADERS, auth=(self._username, self._password), + verify=self._verify) + responces += self._get_data_from_response(first_page) + next_page = first_page + while next_page.links.get('next', None) is not None: + try: + next_page_url = next_page.links['next']['url'] + next_page = requests.get(f'{self._harbor_url}/{next_page_url}', headers=HarborClient.HEADERS, + auth=(self._username, self._password), + verify=self._verify) + responces += self._get_data_from_response(next_page) + except KeyError: + logger.info("No data") + exit(1) + + return responces + + def _delete_image(self, url): + response = requests.delete(url, headers=HarborClient.HEADERS, + auth=(self._username, self._password), verify=self._verify) + if response.status_code != 200: + logger.error(f"ERROR: Not found. {response.status_code}") + exit(1) + else: + logging.info("Image deleted successfully") + + def _get_images(self, artifacts, repo_name): + list_images = [] + for artifact in artifacts: + if artifact["tags"]: + for tag in artifact["tags"]: + list_images.append({"name": repo_name, "tag": tag['name'], "push_time": tag["push_time"], + "pull_time": tag["pull_time"]}) + + return list_images + + def get_repositories(self): + url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories' + return self._get_response(url) + + def get_images(self, repository_name): + rep_name_without_slash = repository_name.replace('/', '%2F') + url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories/' \ + f'{rep_name_without_slash}/artifacts?with_tag=true' + return self._get_images(self._get_response(url), f"{self._project_name}/{repository_name}") + + def delete_image(self, image): + image_name, tag = image.split(':') + rep_name_without_slash = image_name.replace(f'{self._project_name}/', '').replace('/', '%2F') + url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories/{rep_name_without_slash}' \ + f'/artifacts/{tag}' + self._delete_image(url) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..b30ca27 --- /dev/null +++ b/src/main.py @@ -0,0 +1,219 @@ +import argparse +import logging +import os +import re +from datetime import datetime, timedelta +from pprint import pformat + +import requests +import yaml +from requests.packages.urllib3.exceptions import InsecureRequestWarning + +from config import load_cleanup_policy, validate_policy, merge_policies, get_field_from_rule +from harbor_client import HarborClient +from utils import regexp_match, extract_semver + +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') +logger = logging.getLogger('logger') + + +def parse_args(): + """Parse command-line arguments.""" + parser = argparse.ArgumentParser(description='Delete Docker images from a Harbor registry.') + parser.add_argument('--harbor-url', required=True, + help='URL of the Harbor registry, including protocol (e.g. https://harbor.example.com)') + parser.add_argument('--username', required=True, help='Username for the Harbor registry') + parser.add_argument('--password', required=True, help='Password for the Harbor registry') + parser.add_argument('--project-name', required=True, help='Name of the Harbor project') + parser.add_argument('--repository-name', default=None, help='Name of the Harbor repository') + parser.add_argument('--domain-name', default='registry.ru', required=True, + help='Domain name to match against image names') + parser.add_argument('--ignore-tags', default=[], help='List of image tags to exclude from deletion') + parser.add_argument('--dry-run', action='store_true', help='Do a dry run (don\'t actually delete any images)') + return parser.parse_args() + + +def get_kustomization_files(): + """Get all kustomization files in the current directory and its subdirectories.""" + kustomization_files = [] + regex = re.compile('(kustomization.*)') + for root, dirs, files in os.walk('.'): + for file in files: + if regex.match(file): + kustomization_files.append(os.path.join(root, file)) + return kustomization_files + + +def get_harbor_images(kustomization_yaml, domain_name): + """Extract harbor images from kustomization yaml file.""" + images = kustomization_yaml.get('images', []) + harbor_images = [] + for image in images: + if 'name' not in image and 'newName' not in image: + logger.error(f"ERROR: Not found 'name' in images section. {images}") + exit(1) + if 'newName' in image: + if image['newName'].split('/')[0] == domain_name: + harbor_images.append({"name": f"{image['newName']}", "tag": f"{image['newTag']}"}) + elif image['name'].split('/')[0] == domain_name: + harbor_images.append({"name": f"{image['name']}", "tag": f"{image['newTag']}"}) + + return harbor_images + + +def get_tags_by_exclusion(tags: list, exclusion: list) -> list: + """Return tags that are not in the exclusion list.""" + return [tag for tag in tags if tag not in exclusion] + + +def sort_tag(tag) -> list: + """Sort tags by semantic versioning.""" + semver = extract_semver(tag) + return list(map(int, semver.split('.'))) + + +def get_latest_n_tags(list_tags, limit): + """Return the latest n tags.""" + if limit == 0: + return [] + if len(list_tags) > limit: + return list_tags[-limit:] + return list_tags + + +def get_latest_tags_by_regexp(list_tags: list, exp: str, limit: int) -> list: + """Return the latest tags that match the given regular expression.""" + semver_images = [tag for tag in list_tags if regexp_match(exp, tag)] + sorted_tag_list = sorted(semver_images, key=sort_tag) + return get_latest_n_tags(sorted_tag_list, limit) + + +def get_feature_tags_younger_than_n_days(list_harbor_images, list_tags_to_remove, prod_regexp, staging_regexp, + delete_docker_images_older_than): + """Return feature tags younger than n days.""" + now = datetime.utcnow() + last_n_days = now - timedelta(days=delete_docker_images_older_than) + sorted_lst = sorted(list_harbor_images, key=lambda x: x['push_time']) + images_younger_than_n_days = [x for x in sorted_lst if datetime.fromisoformat( + x['push_time'].replace('Z', '')) >= last_n_days and not re.search( + prod_regexp, str(x['tag'])) and not re.search(staging_regexp, str(x['tag']))] + return [image["tag"] for image in images_younger_than_n_days if image["tag"] in list_tags_to_remove] + + +def delete_images(harbor_client, list_images_to_delete, dry_run=None): + """Delete specified images from the Harbor registry.""" + logger.info("#" * 10 + " Docker images to remove " + "#" * 10) + for image in list_images_to_delete: + if dry_run: + logger.info(f"DRY RUN: Deleting image {image}") + else: + logger.info(f"Deleting image {image}") + # harbor_client.delete_image(image) + + +def get_tags_to_delete(repository, list_harbor_images, kustomization_yaml_images, args, policy): + """Process images in a repository.""" + list_harbor_tags = [image["tag"] for image in list_harbor_images] + list_kustomization_yaml_tags = [image["tag"] for image in kustomization_yaml_images if + image["name"] == f"{args.domain_name}/{repository['name']}"] + logger.info("#" * 45) + logger.info(f"List of all tags: {list_harbor_tags}") + logger.info(f"List of tags to save from kustomization yaml files: {list_kustomization_yaml_tags}") + + prod_tags = get_latest_tags_by_regexp(list_harbor_tags, get_field_from_rule(policy, 'SaveLastNProdTags', 'regexp'), + get_field_from_rule(policy, 'SaveLastNProdTags', 'limit')) + logger.info(f"List of prod tags to save: {prod_tags}") + staging_tags = get_latest_tags_by_regexp(list_harbor_tags, + get_field_from_rule(policy, 'SaveLastNStagingTags', 'regexp'), + get_field_from_rule(policy, 'SaveLastNStagingTags', 'limit')) + logger.info(f"List of staging tags to save: {staging_tags}") + tags_to_remove = set(list_harbor_tags) - set(prod_tags) - set(staging_tags) - set(list_kustomization_yaml_tags) + ignore_tags = get_field_from_rule(policy, 'IgnoreTags', 'tags') + exclude_tags = ['develop', 'latest', 'master', 'main'] + ignore_tags + logger.info(f"List of exclude tags: {exclude_tags}") + list_tags_to_remove = get_tags_by_exclusion(list(tags_to_remove), exclude_tags) + + list_feature_tags_younger_than_n_days = get_feature_tags_younger_than_n_days(list_harbor_images, + list_tags_to_remove, + get_field_from_rule( + policy, + 'SaveLastNProdTags', + 'regexp'), + get_field_from_rule( + policy, + 'SaveLastNStagingTags', + 'regexp'), + get_field_from_rule( + policy, + 'DeleteOlderThan', + 'days')) + logger.info( + f"List of tags younger than {get_field_from_rule(policy, 'DeleteOlderThan', 'days')} " + f"days to save: {list_feature_tags_younger_than_n_days}") + + list_tags_to_remove = list(set(list_tags_to_remove) - set(list_feature_tags_younger_than_n_days)) + + list_n_last_tags = get_latest_n_tags(list_tags_to_remove, + get_field_from_rule(policy, 'SaveLastNFeatureTags', 'limit')) + logger.info( + f"List of {get_field_from_rule(policy, 'SaveLastNFeatureTags', 'limit')} last tags to save: {list_n_last_tags}") + + list_tags_to_remove = list(set(list_tags_to_remove) - set(list_n_last_tags)) + logging.info(f"List of tags to remove: {list_tags_to_remove}\n") + return list_tags_to_remove + + +if __name__ == '__main__': + args = parse_args() + + cleanup_policy = load_cleanup_policy() + for policy in cleanup_policy: + try: + validate_policy(policy) + except ValueError as e: + logger.error(f"Error in policy '{policy['name']}': {str(e)}") + exit(1) + + policy = merge_policies(cleanup_policy) + logger.info(f"Rules:\n{pformat(policy)}\n") + + kustomization_files = get_kustomization_files() + kustomization_yaml_images = [] + for kustomization_file in kustomization_files: + with open(kustomization_file, 'r') as f: + kustomization_yaml = yaml.safe_load(f) + kustomization_yaml_images += get_harbor_images(kustomization_yaml, args.domain_name) + + logger.info( + f"List of images from kustomization.yaml files:\n" + "\n".join(map(str, kustomization_yaml_images)) + "\n") + + harbor_client = HarborClient(harbor_url=args.harbor_url, project_name=args.project_name, username=args.username, + password=args.password) + + repositories = harbor_client.get_repositories() + repositories_names = [repository_["name"] for repository_ in repositories] + + list_images_to_delete = [] + for repository in repositories: + if args.repository_name and f'{args.project_name}/{args.repository_name}' not in repositories_names: + logger.error( + f"The repository_name - '{args.repository_name}' not found. " + f"List of repositories names - {repositories_names}") + exit(1) + + repository_name = repository["name"].replace(f'{args.project_name}/', '') + if args.repository_name and args.repository_name != repository_name: + continue + list_harbor_images = harbor_client.get_images(repository_name) + + logger.info( + f"List of images from harbor for repo name " + f"{repository['name']}:\n" + "\n".join(map(str, list_harbor_images)) + "\n") + + list_tags_to_delete = get_tags_to_delete(repository, list_harbor_images, kustomization_yaml_images, args, + policy) + list_images_to_delete += [f"{repository['name']}:{tag}" for tag in list_tags_to_delete] + + delete_images(harbor_client, list_images_to_delete, args.dry_run) diff --git a/src/tests/test_config.py b/src/tests/test_config.py new file mode 100644 index 0000000..bfe0190 --- /dev/null +++ b/src/tests/test_config.py @@ -0,0 +1,123 @@ +import pytest + +from config import * + + +def test_create_file_with_policy(): + # Create .harbor_cleanup_policy.yaml file + with open(".harbor_cleanup_policy.yaml", "w") as f: + f.write("""policies: +- name: Test Policy + rules: + - rule: DeleteOlderThan + days: 14 + - rule: SaveLastNProdTags + regexp: '^v\d+\.\d+\.\d+$' + limit: 10 + - rule: SaveLastNStagingTags + regexp: '^v\d+\.\d+\.\d+.+' + limit: 10 + - rule: SaveLastNFeatureTags + limit: 10 +""") + + assert load_cleanup_policy() == [{ + 'name': 'Test Policy', + 'rules': [ + {'rule': 'DeleteOlderThan', 'days': 14}, + {'rule': 'SaveLastNProdTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 10}, + {'rule': 'SaveLastNStagingTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+.+', 'limit': 10}, + {'rule': 'SaveLastNFeatureTags', 'limit': 10} + ] + }] + + # Remove .harbor_cleanup_policy.yaml file + os.remove(".harbor_cleanup_policy.yaml") + + +def test_default_policy_if_file_doesnt_exist(): + # Test default policy if file doesn't exist + assert load_cleanup_policy() == DEFAULT_POLICIES + + +def test_validate_rule_save_last_n_prod_tags(): + # Test valid rule + rule = {'rule': 'SaveLastNProdTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 5} + assert validate_rule(rule) == None + + +def test_invalid_rule_with_missing_rule_field(): + # Test invalid rule - missing rule field + rule = {'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 5} + with pytest.raises(ValueError): + validate_rule(rule) + + +def test_invalid_rule_wrong_rule_field(): + # Test invalid rule - wrong rule field + rule = {'rule': 'WrongRule', 'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 5} + with pytest.raises(ValueError): + validate_rule(rule) + + +def test_invalid_rule_with_missing_days_field(): + # Test invalid rule - missing days field in DeleteOlderThan rule + rule = {'rule': 'DeleteOlderThan'} + with pytest.raises(ValueError): + validate_rule(rule) + + +def test_invalid_rule_with_missing_regexp_field(): + # Test invalid rule - missing regexp field in SaveLastNProdTags rule + rule = {'rule': 'SaveLastNProdTags', 'limit': 5} + with pytest.raises(ValueError): + validate_rule(rule) + + +def test_invalid_rule_with_missing_limit_field(): + # Test invalid rule - missing limit field in SaveLastNProdTags rule + rule = {'rule': 'SaveLastNProdTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+$'} + assert validate_rule(rule) == None + + +def test_validate_policy_missing_name(): + with pytest.raises(ValueError, match="Missing 'name' field in policy"): + validate_policy({"rules": []}) + + +def test_validate_policy_missing_rules(): + with pytest.raises(ValueError, match="Missing 'rules' field in policy"): + validate_policy({"name": "policy name"}) + + +def test_merge_policies(): + policy = [{ + 'name': 'Test Policy', + 'rules': [ + {'rule': 'DeleteOlderThan', 'days': 14}, + {'rule': 'SaveLastNProdTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 5}, + {'rule': 'SaveLastNStagingTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+.+', 'limit': 10}, + {'rule': 'SaveLastNFeatureTags', 'limit': 10} + ] + }] + expected_policy = { + 'name': 'Test Policy', + 'rules': [ + {'rule': 'DeleteOlderThan', 'days': 14}, + {'rule': 'SaveLastNProdTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+$', 'limit': 5}, + {'rule': 'SaveLastNStagingTags', 'regexp': '^v\\d+\\.\\d+\\.\\d+.+', 'limit': 10}, + {'rule': 'SaveLastNFeatureTags', 'limit': 10}, + {'rule': 'IgnoreTags', 'tags': []} + ] + } + t = merge_policies(policy) + assert merge_policies(policy) == expected_policy + + +def test_get_field_from_rule(): + policy = {"name": "policy1", + "rules": [{"rule": "rule1", "field1": "value1"}, {"rule": "rule2", "field1": "value2"}]} + assert get_field_from_rule(policy, "rule1", "field1") == "value1" + assert get_field_from_rule(policy, "rule2", "field1") == "value2" + with pytest.raises(StopIteration): + get_field_from_rule(policy, "rule3", "field1") diff --git a/src/tests/test_harbor_cleanup.py b/src/tests/test_harbor_cleanup.py new file mode 100644 index 0000000..8e389ba --- /dev/null +++ b/src/tests/test_harbor_cleanup.py @@ -0,0 +1,75 @@ +import pytest +from unittest.mock import MagicMock +from main import * + + +@pytest.fixture() +def mock_harbor_client(): + harbor_client = HarborClient('https://harbor.example.com', 'username', 'password', 'project-name', + 'repository-name') + harbor_client.list_images = MagicMock(return_value=[{"name": "harbor.example.com/image1", "tag": "v1.0"}, + {"name": "harbor.example.com/image1", "tag": "v1.1"}, + {"name": "harbor.example.com/image1", "tag": "v1.2"}, + {"name": "harbor.example.com/image2", "tag": "v1.0"}, + {"name": "harbor.example.com/image2", "tag": "v1.1"}]) + harbor_client.get_image_manifest = MagicMock(return_value={"config": {"digest": "sha256:config_digest"}}) + harbor_client.get_image_labels = MagicMock(return_value={"key": "value"}) + return harbor_client + + +@pytest.fixture() +def mock_kustomization_yaml(): + kustomization_yaml = {"images": [{"name": "harbor.example.com/image1", "newTag": "v1.0"}, + {"name": "harbor.example.com/image2", "newName": "harbor.example.com/image2-new", + "newTag": "v1.0"}]} + return kustomization_yaml + + +def test_get_harbor_images(mock_kustomization_yaml): + harbor_images = get_harbor_images(mock_kustomization_yaml, "harbor.example.com") + assert len(harbor_images) == 2 + assert harbor_images[0] == {"name": "harbor.example.com/image1", "tag": "v1.0"} + assert harbor_images[1] == {"name": "harbor.example.com/image2-new", "tag": "v1.0"} + + +def test_get_tags_by_exclusion(): + tags = ["v1.0", "v1.1", "v1.2", "v1.3"] + exclusion = ["v1.1", "v1.2"] + tags_after_exclusion = get_tags_by_exclusion(tags, exclusion) + assert len(tags_after_exclusion) == 2 + assert "v1.0" in tags_after_exclusion + assert "v1.3" in tags_after_exclusion + + +def test_sort_tag(): + assert sort_tag("v1.0.1") == [1, 0, 1] + assert sort_tag("v2.1.0") == [2, 1, 0] + assert sort_tag("v0.0.1") == [0, 0, 1] + + +def test_get_latest_n_tags(): + list_tags = ["v1.0", "v1.1", "v1.2", "v1.3"] + limit = 2 + assert get_latest_n_tags(list_tags, limit) == ["v1.2", "v1.3"] + + +def test_get_harbor_images_by_domain(): + kustomization_yaml = {"images": [{"name": "harbor.example.com/image1", "newTag": "v1.0"}, + {"name": "harbor.example.com/image2", "newName": "docker.example.ru/image2-new", + "newTag": "v1.1"}]} + + domain_name = "harbor.example.com" + harbor_images = get_harbor_images(kustomization_yaml, domain_name) + + assert harbor_images == [{"name": "harbor.example.com/image1", "tag": "v1.0"}] + + +@pytest.mark.parametrize("list_tags,exp,limit,expected_output", [ + (["1.0.0", "2.0.1", "1.0.1", "2.0.0"], r"\d+\.\d+\.\d+", 2, ["2.0.0", "2.0.1"]), + (["v1.0.0", "v2.0.0", "v2.0.1", "v1.0.1"], r"v\d+\.\d+\.\d+", 3, ["v1.0.1", "v2.0.0", "v2.0.1"]), + (["v1.0.0-rc", "v2.0.0-rc", "v1.0.1-rc", "v2.0.1-rc"], r"^v\d+\.\d+\.\d+.+", 2, ["v2.0.0-rc", "v2.0.1-rc"]), +]) +def test_get_latest_tags_by_regexp(list_tags, exp, limit, expected_output): + output = get_latest_tags_by_regexp(list_tags, exp, limit) + assert output == expected_output + # assert sorted(output, key=sort_tag, reverse=True) == output diff --git a/src/tests/test_utils.py b/src/tests/test_utils.py new file mode 100644 index 0000000..8430563 --- /dev/null +++ b/src/tests/test_utils.py @@ -0,0 +1,15 @@ +from utils import regexp_match, extract_semver + + +def test_regexp_match(): + assert regexp_match(r'^\d{3}$', '123') == True + assert regexp_match(r'^\d{3}$', '12a') == False + assert regexp_match(r'^[a-z]+$', 'hello') == True + assert regexp_match(r'^[a-z]+$', 'Hello') == False + + +def test_extract_semver(): + assert extract_semver('v1.2.3') == '1.2.3' + assert extract_semver('version-1.2.3') == '1.2.3' + assert extract_semver('2.0') == '' + assert extract_semver('1.2.3-beta.1') == '1.2.3' diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..6cdb31c --- /dev/null +++ b/src/utils.py @@ -0,0 +1,12 @@ +import re + + +def regexp_match(exp: str, string: str) -> bool: + return bool(re.match(exp, string)) + + +def extract_semver(tag: str) -> str: + match = re.search(r'\d+\.\d+\.\d+', tag) + if match: + return match.group(0) + return ""