Skip to content

Commit

Permalink
refactor module_utils.acm and add unit tests (#1273)
Browse files Browse the repository at this point in the history
refactor module_utils.acm and add unit tests

SUMMARY

ISSUE TYPE


Feature Pull Request
  • Loading branch information
abikouo authored Nov 29, 2022
1 parent bb03237 commit 19ce17f
Show file tree
Hide file tree
Showing 3 changed files with 496 additions and 120 deletions.
3 changes: 3 additions & 0 deletions changelogs/fragments/module_utils_acm-unit-testing.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- module_utils.acm - Refactor ACMServiceManager class and add unit tests (https://github.com/ansible-collections/amazon.aws/pull/1273).
262 changes: 142 additions & 120 deletions plugins/module_utils/acm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,178 +42,200 @@
from .ec2 import boto3_tag_list_to_ansible_dict


def acm_catch_boto_exception(func):

def runner(*args, **kwargs):
module = kwargs.pop('module', None)
error = kwargs.pop('error', None)
ignore_error_codes = kwargs.pop('ignore_error_codes', [])

try:
return func(*args, **kwargs)
except is_boto3_error_code(ignore_error_codes):
return None
except (BotoCoreError, ClientError) as e:
if not module:
raise
module.fail_json_aws(e, msg=error)
return runner


class ACMServiceManager:
"""Handles ACM Facts Services"""

def __init__(self, module):
self.module = module
self.client = module.client('acm')

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
def delete_certificate_with_backoff(self, client, arn):
client.delete_certificate(CertificateArn=arn)

def delete_certificate(self, client, module, arn):
module.debug("Attempting to delete certificate %s" % arn)
try:
self.delete_certificate_with_backoff(client, arn)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, msg="Couldn't delete certificate %s" % arn)
module.debug("Successfully deleted certificate %s" % arn)
def delete_certificate_with_backoff(self, arn):
self.client.delete_certificate(CertificateArn=arn)

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
def list_certificates_with_backoff(self, client, statuses=None):
paginator = client.get_paginator('list_certificates')
def list_certificates_with_backoff(self, statuses=None):
paginator = self.client.get_paginator('list_certificates')
kwargs = dict()
if statuses:
kwargs['CertificateStatuses'] = statuses
return paginator.paginate(**kwargs).build_full_result()['CertificateSummaryList']

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
def get_certificate_with_backoff(self, client, certificate_arn):
response = client.get_certificate(CertificateArn=certificate_arn)
def get_certificate_with_backoff(self, certificate_arn):
response = self.client.get_certificate(CertificateArn=certificate_arn)
# strip out response metadata
return {'Certificate': response['Certificate'],
'CertificateChain': response['CertificateChain']}
return {'Certificate': response['Certificate'], 'CertificateChain': response['CertificateChain']}

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
def describe_certificate_with_backoff(self, client, certificate_arn):
return client.describe_certificate(CertificateArn=certificate_arn)['Certificate']
def describe_certificate_with_backoff(self, certificate_arn):
return self.client.describe_certificate(CertificateArn=certificate_arn)['Certificate']

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
def list_certificate_tags_with_backoff(self, client, certificate_arn):
return client.list_tags_for_certificate(CertificateArn=certificate_arn)['Tags']

# Returns a list of certificates
# if domain_name is specified, returns only certificates with that domain
# if an ARN is specified, returns only that certificate
# only_tags is a dict, e.g. {'key':'value'}. If specified this function will return
# only certificates which contain all those tags (key exists, value matches).
def get_certificates(self, client, module, domain_name=None, statuses=None, arn=None, only_tags=None):
try:
all_certificates = self.list_certificates_with_backoff(client=client, statuses=statuses)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, msg="Couldn't obtain certificates")
if domain_name:
certificates = [cert for cert in all_certificates
if cert['DomainName'] == domain_name]
else:
certificates = all_certificates
def list_certificate_tags_with_backoff(self, certificate_arn):
return self.client.list_tags_for_certificate(CertificateArn=certificate_arn)['Tags']

@acm_catch_boto_exception
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
def import_certificate_with_backoff(self, certificate, private_key, certificate_chain, arn):
params = {'Certificate': to_bytes(certificate), 'PrivateKey': to_bytes(private_key)}
if arn:
# still return a list, not just one item
certificates = [c for c in certificates if c['CertificateArn'] == arn]
params['CertificateArn'] = arn
if certificate_chain:
params['CertificateChain'] = certificate_chain

return self.client.import_certificate(**params)['CertificateArn']

# Tags are a normal Ansible style dict
# {'Key':'Value'}
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
def tag_certificate_with_backoff(self, arn, tags):
aws_tags = ansible_dict_to_boto3_tag_list(tags)
self.client.add_tags_to_certificate(CertificateArn=arn, Tags=aws_tags)

def _match_tags(self, ref_tags, cert_tags):
if ref_tags is None:
return True
try:
return all(k in cert_tags for k in ref_tags) and all(cert_tags.get(k) == ref_tags[k] for k in ref_tags)
except (TypeError, AttributeError) as e:
self.module.fail_json_aws(e, msg="ACM tag filtering err")

def delete_certificate(self, *args, arn=None):
# hacking for backward compatibility
if arn is None:
if len(args) < 3:
self.module.fail_json(msg="Missing required certificate arn to delete.")
arn = args[2]
error = "Couldn't delete certificate %s" % arn
self.delete_certificate_with_backoff(arn, module=self.module, error=error)

def get_certificates(self, *args, domain_name=None, statuses=None, arn=None, only_tags=None, **kwargs):
"""
Returns a list of certificates
if domain_name is specified, returns only certificates with that domain
if an ARN is specified, returns only that certificate
only_tags is a dict, e.g. {'key':'value'}. If specified this function will return
only certificates which contain all those tags (key exists, value matches).
"""
all_certificates = self.list_certificates_with_backoff(
statuses=statuses,
module=self.module,
error="Couldn't obtain certificates"
)

def _filter_certificate(cert):
if domain_name and cert['DomainName'] != domain_name:
return False
if arn and cert['CertificateArn'] != arn:
return False
return True

certificates = list(filter(_filter_certificate, all_certificates))

results = []
for certificate in certificates:
try:
cert_data = self.describe_certificate_with_backoff(client, certificate['CertificateArn'])
except is_boto3_error_code('ResourceNotFoundException'):
# The certificate was deleted after the call to list_certificates_with_backoff.
cert_data = self.describe_certificate_with_backoff(
certificate['CertificateArn'],
module=self.module,
error="Couldn't obtain certificate metadata for domain %s" % certificate['DomainName'],
ignore_error_codes=['ResourceNotFoundException']
)
if cert_data is None:
continue
except (BotoCoreError, ClientError) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Couldn't obtain certificate metadata for domain %s" % certificate['DomainName'])

# in some states, ACM resources do not have a corresponding cert
if cert_data['Status'] not in ['PENDING_VALIDATION', 'VALIDATION_TIMED_OUT', 'FAILED']:
try:
cert_data.update(self.get_certificate_with_backoff(client, certificate['CertificateArn']))
except is_boto3_error_code('ResourceNotFoundException'):
# The certificate was deleted after the call to list_certificates_with_backoff.
if cert_data['Status'] not in ('PENDING_VALIDATION', 'VALIDATION_TIMED_OUT', 'FAILED'):
cert_info = self.get_certificate_with_backoff(
certificate['CertificateArn'],
module=self.module,
error="Couldn't obtain certificate data for domain %s" % certificate['DomainName'],
ignore_error_codes=['ResourceNotFoundException']
)
if cert_info is None:
continue
except (BotoCoreError, ClientError, KeyError) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Couldn't obtain certificate data for domain %s" % certificate['DomainName'])
cert_data.update(cert_info)

cert_data = camel_dict_to_snake_dict(cert_data)
try:
tags = self.list_certificate_tags_with_backoff(client, certificate['CertificateArn'])
except is_boto3_error_code('ResourceNotFoundException'):
# The certificate was deleted after the call to list_certificates_with_backoff.
tags = self.list_certificate_tags_with_backoff(
certificate['CertificateArn'],
module=self.module,
error="Couldn't obtain tags for domain %s" % certificate['DomainName'],
ignore_error_codes=['ResourceNotFoundException']
)
if tags is None:
continue
except (BotoCoreError, ClientError) as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg="Couldn't obtain tags for domain %s" % certificate['DomainName'])

cert_data['tags'] = boto3_tag_list_to_ansible_dict(tags)
tags = boto3_tag_list_to_ansible_dict(tags)
if not self._match_tags(only_tags, tags):
continue
cert_data['tags'] = tags
results.append(cert_data)

if only_tags:
for tag_key in only_tags:
try:
results = [c for c in results if ('tags' in c) and (tag_key in c['tags']) and (c['tags'][tag_key] == only_tags[tag_key])]
except (TypeError, AttributeError) as e:
for c in results:
if 'tags' not in c:
module.debug("cert is %s" % str(c))
module.fail_json(msg="ACM tag filtering err", exception=e)

return results

# returns the domain name of a certificate (encoded in the public cert)
# for a given ARN
# A cert with that ARN must already exist
def get_domain_of_cert(self, client, module, arn):
def get_domain_of_cert(self, arn, **kwargs):
"""
returns the domain name of a certificate (encoded in the public cert)
for a given ARN A cert with that ARN must already exist
"""
if arn is None:
module.fail(msg="Internal error with ACM domain fetching, no certificate ARN specified")
try:
cert_data = self.describe_certificate_with_backoff(client=client, certificate_arn=arn)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, msg="Couldn't obtain certificate data for arn %s" % arn)
self.module.fail_json(msg="Internal error with ACM domain fetching, no certificate ARN specified")
error = "Couldn't obtain certificate data for arn %s" % arn
cert_data = self.describe_certificate_with_backoff(certificate_arn=arn, module=self.module, error=error)
return cert_data['DomainName']

@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException'])
def import_certificate_with_backoff(self, client, certificate, private_key, certificate_chain, arn):
if certificate_chain:
if arn:
ret = client.import_certificate(Certificate=to_bytes(certificate),
PrivateKey=to_bytes(private_key),
CertificateChain=to_bytes(certificate_chain),
CertificateArn=arn)
else:
ret = client.import_certificate(Certificate=to_bytes(certificate),
PrivateKey=to_bytes(private_key),
CertificateChain=to_bytes(certificate_chain))
else:
if arn:
ret = client.import_certificate(Certificate=to_bytes(certificate),
PrivateKey=to_bytes(private_key),
CertificateArn=arn)
else:
ret = client.import_certificate(Certificate=to_bytes(certificate),
PrivateKey=to_bytes(private_key))
return ret['CertificateArn']

# Tags are a normal Ansible style dict
# {'Key':'Value'}
@AWSRetry.jittered_backoff(delay=5, catch_extra_error_codes=['RequestInProgressException', 'ResourceNotFoundException'])
def tag_certificate_with_backoff(self, client, arn, tags):
aws_tags = ansible_dict_to_boto3_tag_list(tags)
client.add_tags_to_certificate(CertificateArn=arn, Tags=aws_tags)

def import_certificate(self, client, module, certificate, private_key, arn=None, certificate_chain=None, tags=None):
def import_certificate(self, *args, certificate, private_key, arn=None, certificate_chain=None, tags=None):

original_arn = arn

# upload cert
try:
arn = self.import_certificate_with_backoff(client, certificate, private_key, certificate_chain, arn)
except (BotoCoreError, ClientError) as e:
module.fail_json_aws(e, msg="Couldn't upload new certificate")

params = {
'certificate': certificate,
'private_key': private_key,
'certificate_chain': certificate_chain,
'arn': arn,
'module': self.module,
'error': "Couldn't upload new certificate"
}
arn = self.import_certificate_with_backoff(**params)
if original_arn and (arn != original_arn):
# I'm not sure whether the API guarentees that the ARN will not change
# I'm failing just in case.
# If I'm wrong, I'll catch it in the integration tests.
module.fail_json(msg="ARN changed with ACM update, from %s to %s" % (original_arn, arn))
self.module.fail_json(msg="ARN changed with ACM update, from %s to %s" % (original_arn, arn))

# tag that cert
try:
self.tag_certificate_with_backoff(client, arn, tags)
self.tag_certificate_with_backoff(arn, tags)
except (BotoCoreError, ClientError) as e:
module.debug("Attempting to delete the cert we just created, arn=%s" % arn)
try:
self.delete_certificate_with_backoff(client, arn)
self.delete_certificate_with_backoff(arn)
except (BotoCoreError, ClientError):
module.warn("Certificate %s exists, and is not tagged. So Ansible will not see it on the next run.")
module.fail_json_aws(e, msg="Couldn't tag certificate %s, couldn't delete it either" % arn)
module.fail_json_aws(e, msg="Couldn't tag certificate %s" % arn)
self.module.warn("Certificate %s exists, and is not tagged. So Ansible will not see it on the next run." % arn)
self.module.fail_json_aws(e, msg="Couldn't tag certificate %s, couldn't delete it either" % arn)
self.module.fail_json_aws(e, msg="Couldn't tag certificate %s" % arn)

return arn
Loading

0 comments on commit 19ce17f

Please sign in to comment.