-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Юсев Николай
committed
Jun 2, 2023
1 parent
cdee5c2
commit e05fc7e
Showing
10 changed files
with
636 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
policies: | ||
- name: Policies for cleanup docker images | ||
rules: | ||
- rule: SaveLastNProdTags | ||
regexp: '^v\d+\.\d+\.\d+$' | ||
limit: 2 | ||
- rule: SaveLastNStagingTags | ||
regexp: '^v\d+\.\d+\.\d+.+' | ||
limit: 2 | ||
- rule: DeleteOlderThan | ||
days: 3 | ||
- rule: SaveLastNFeatureTags | ||
limit: 3 | ||
- rule: IgnoreTags | ||
tags: | ||
- "feature-branch" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
PyYAML==6.0 | ||
docker-registry-client==0.5.2 | ||
requests==2.29.0 | ||
jsonpath-rw==1.4.0 | ||
pytest==7.3.1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import yaml | ||
import os | ||
|
||
# Default policies | ||
DEFAULT_POLICIES = [{ | ||
'name': 'Default Policy', | ||
'rules': [ | ||
{'rule': 'DeleteOlderThan', 'days': 7}, | ||
{'rule': 'SaveLastNProdTags', 'regexp': r'^v\d+\.\d+\.\d+$', 'limit': 5}, | ||
{'rule': 'SaveLastNStagingTags', 'regexp': r'^v\d+\.\d+\.\d+.+', 'limit': 5}, | ||
{'rule': 'SaveLastNFeatureTags', 'limit': 10}, | ||
{'rule': 'IgnoreTags', 'tags': []} | ||
] | ||
}] | ||
|
||
|
||
def load_cleanup_policy(): | ||
""" | ||
Load Harbor cleanup policy from .harbor_cleanup_policy.yaml file or return default policies | ||
""" | ||
# Check if .harbor_cleanup_policy.yaml file exists | ||
if os.path.exists(".harbor_cleanup_policy.yaml"): | ||
with open(".harbor_cleanup_policy.yaml", "r") as f: | ||
cleanup_policy = yaml.safe_load(f) | ||
# Check if policies key exists in cleanup_policy dict | ||
if "policies" in cleanup_policy: | ||
return cleanup_policy["policies"] | ||
# Return default policies if file doesn't exist or policies key is missing | ||
return DEFAULT_POLICIES | ||
|
||
|
||
def validate_rule(rule): | ||
""" | ||
Validate that rule has the required fields | ||
""" | ||
if "rule" not in rule: | ||
raise ValueError("Missing 'rule' field in rule") | ||
if rule["rule"] not in ["SaveLastNTags", "DeleteOlderThan", "SaveLastNProdTags", "SaveLastNStagingTags", | ||
"SaveLastNFeatureTags", "IgnoreTags"]: | ||
raise ValueError(f"The rule {rule['rule']} is wrong") | ||
if rule["rule"] == "DeleteOlderThan" and "days" not in rule: | ||
raise ValueError("Missing 'days' field in rule") | ||
if rule["rule"] == "SaveLastNProdTags" and "regexp" not in rule: | ||
raise ValueError("Missing 'regexp' field in rule") | ||
if rule["rule"] == "SaveLastNProdTags" and "limit" not in rule: | ||
rule["limit"] = next( | ||
(d['limit'] for d in DEFAULT_POLICIES[0]["rules"] if 'SaveLastNProdTags' in d.get('rule', ''))) | ||
if rule["rule"] == "SaveLastNProdTags" and rule["limit"] < 1: | ||
raise ValueError("Missing 'limit' field in rule should be more then 1") | ||
if rule["rule"] == "SaveLastNStagingTags" and "regexp" not in rule: | ||
raise ValueError("Missing 'regexp' field in rule") | ||
if rule["rule"] == "SaveLastNStagingTags" and "limit" not in rule: | ||
rule["limit"] = next( | ||
(d['limit'] for d in DEFAULT_POLICIES[0]["rules"] if 'SaveLastNStagingTags' in d.get('rule', ''))) | ||
if rule["rule"] == "SaveLastNStagingTags" and rule["limit"] < 1: | ||
raise ValueError("Missing 'limit' field in rule should be more then 1") | ||
if rule["rule"] == "SaveLastNFeatureTags" and "limit" not in rule: | ||
raise ValueError("Missing 'limit' field in rule") | ||
if rule["rule"] == "IgnoreTags" and "tags" not in rule: | ||
raise ValueError("Missing 'tags' field in rule") | ||
if "limit" in rule and not isinstance(rule["limit"], int): | ||
raise ValueError("'limit' field in rule must be an integer") | ||
if "days" in rule and not isinstance(rule["days"], int): | ||
raise ValueError("'days' field in rule must be an integer") | ||
|
||
|
||
def validate_policy(policy): | ||
""" | ||
Validate that policy has the required fields | ||
""" | ||
if "name" not in policy: | ||
raise ValueError("Missing 'name' field in policy") | ||
if "rules" not in policy: | ||
raise ValueError("Missing 'rules' field in policy") | ||
for rule in policy["rules"]: | ||
validate_rule(rule) | ||
|
||
|
||
def merge_policies(policies): | ||
# Merge policies with default policies | ||
for policy in policies: | ||
for default_policy in DEFAULT_POLICIES[0]['rules']: | ||
if default_policy['rule'] not in [p['rule'] for p in policy['rules']]: | ||
policy['rules'].append(default_policy) | ||
|
||
return policies[0] | ||
|
||
|
||
def get_field_from_rule(policy, rule, field): | ||
return next( | ||
(d[f'{field}'] for d in policy["rules"] if rule in d.get('rule', ''))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import logging | ||
|
||
import requests | ||
|
||
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') | ||
logger = logging.getLogger('logger') | ||
|
||
|
||
class HarborClient: | ||
HEADERS = {'Content-Type': 'application/json', 'accept': 'application/json'} | ||
|
||
def __init__(self, harbor_url, project_name, username, password, ssl_verify=False): | ||
self._harbor_url = harbor_url | ||
self._project_name = project_name | ||
self._username = username | ||
self._password = password | ||
self._verify = ssl_verify | ||
|
||
def _get_data_from_response(self, resp): | ||
if resp.status_code == 200: | ||
return resp.json() | ||
else: | ||
logger.error(f"ERROR: Not found. {resp.status_code}") | ||
exit(1) | ||
|
||
def _get_response(self, url): | ||
responces = [] | ||
first_page = requests.get(url, headers=HarborClient.HEADERS, auth=(self._username, self._password), | ||
verify=self._verify) | ||
responces += self._get_data_from_response(first_page) | ||
next_page = first_page | ||
while next_page.links.get('next', None) is not None: | ||
try: | ||
next_page_url = next_page.links['next']['url'] | ||
next_page = requests.get(f'{self._harbor_url}/{next_page_url}', headers=HarborClient.HEADERS, | ||
auth=(self._username, self._password), | ||
verify=self._verify) | ||
responces += self._get_data_from_response(next_page) | ||
except KeyError: | ||
logger.info("No data") | ||
exit(1) | ||
|
||
return responces | ||
|
||
def _delete_image(self, url): | ||
response = requests.delete(url, headers=HarborClient.HEADERS, | ||
auth=(self._username, self._password), verify=self._verify) | ||
if response.status_code != 200: | ||
logger.error(f"ERROR: Not found. {response.status_code}") | ||
exit(1) | ||
else: | ||
logging.info("Image deleted successfully") | ||
|
||
def _get_images(self, artifacts, repo_name): | ||
list_images = [] | ||
for artifact in artifacts: | ||
if artifact["tags"]: | ||
for tag in artifact["tags"]: | ||
list_images.append({"name": repo_name, "tag": tag['name'], "push_time": tag["push_time"], | ||
"pull_time": tag["pull_time"]}) | ||
|
||
return list_images | ||
|
||
def get_repositories(self): | ||
url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories' | ||
return self._get_response(url) | ||
|
||
def get_images(self, repository_name): | ||
rep_name_without_slash = repository_name.replace('/', '%2F') | ||
url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories/' \ | ||
f'{rep_name_without_slash}/artifacts?with_tag=true' | ||
return self._get_images(self._get_response(url), f"{self._project_name}/{repository_name}") | ||
|
||
def delete_image(self, image): | ||
image_name, tag = image.split(':') | ||
rep_name_without_slash = image_name.replace(f'{self._project_name}/', '').replace('/', '%2F') | ||
url = f'{self._harbor_url}/api/v2.0/projects/{self._project_name}/repositories/{rep_name_without_slash}' \ | ||
f'/artifacts/{tag}' | ||
self._delete_image(url) |
Oops, something went wrong.