Skip to content

Commit

Permalink
Add support for AWS Resource Explorer
Browse files Browse the repository at this point in the history
Signed-off-by: Andrej Podhradsky <[email protected]>
  • Loading branch information
apodhrad committed Oct 4, 2023
1 parent 5887e5f commit 7a9f647
Showing 1 changed file with 94 additions and 0 deletions.
94 changes: 94 additions & 0 deletions wrapanapi/systems/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import boto3
import os
import re
import typing

from botocore.config import Config
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -448,6 +449,63 @@ def cleanup(self):
"""
return self.delete()

class Resource():
"""
This class represents a resource returned by Resource Explorer.
"""
def __init__(self, arn, region, resource_type, service, properties=[]):
self.arn = arn
self.region = region
self.resource_type = resource_type
self.service = service
self.properties = properties

def get_tag_value(self, key) -> str | None:
"""
Returns a tag value for a given tag key.
Tags are taken from the resource properties.
Args:
key: a tag key
"""
tags = self.get_tags(regex=f"^{key}$")
if len(tags) > 0:
return tags[0].get('Value')
return None

def get_tags(self, regex="") -> typing.List[dict]:
"""
Returns a list of tags (a dict with keys 'Key' and 'Value').
Tags are taken from the resource properties.
Args:
regex: a regular expressions for keys, default is ""
"""
list = []
if len(self.properties) > 0:
data = self.properties[0].get('Data')
for tag in data:
key = tag.get('Key')
if re.match(regex, key):
list.append(tag)
return list

def get_id(self) -> str:
"""
Returns the last part part of the arn.
This is part is used as id in aws cli.
"""
return self.arn.split(':')[-1]

def get_name(self) -> str:
"""
Returns a name for the resource derived from the associated tag with key 'Name'.
If there is no such tag then the name is the id from arn.
"""
name = self.get_tag_value('Name')
if not name:
name = self.get_id()
return name

class EC2System(System, VmMixin, TemplateMixin, StackMixin, NetworkMixin):
"""EC2 Management System, powered by boto
Expand Down Expand Up @@ -506,6 +564,7 @@ def __init__(self, **kwargs):
self.ssm_connection = boto3client('ssm', **connection_kwargs)
self.sns_connection = boto3client('sns', **connection_kwargs)
self.cw_events_connection = boto3client('events', **connection_kwargs)
self.resource_explorer_connection = boto3client('resource-explorer-2', **connection_kwargs)

self.kwargs = kwargs

Expand Down Expand Up @@ -1624,3 +1683,38 @@ def cleanup_resources(self):
self.remove_all_unused_nics()
self.remove_all_unused_volumes()
self.remove_all_unused_ips()

def list_resources(self, query="", view="") -> typing.List[Resource]:
"""
Lists resources using AWS Resource Explorer (resource-explorer-2).
Args:
query: keywords and filters for resources; default is "" (all)
view: arn of the view to use for the query; default is "" (default view)
Result:
a list of resources satisfying the query
Examples:
Use query 'tag.key:kubernetes.io/cluster/*' to list OCP resources
"""
args = {
'QueryString':query
}
if view:
args['ViewArn'] = view
list = []
paginator = self.resource_explorer_connection.get_paginator('search')
page_iterator = paginator.paginate(**args)
for page in page_iterator:
resources = page.get('Resources')
for r in resources:
resource = Resource(
arn=r.get('Arn'),
region=r.get('Region'),
service=r.get('Service'),
properties=r.get('Properties'),
resource_type=r.get('ResourceType')
)
list.append(resource)
return list

0 comments on commit 7a9f647

Please sign in to comment.