Skip to content

feat(docs): Update README with new EC2 scripts #70

feat(docs): Update README with new EC2 scripts

feat(docs): Update README with new EC2 scripts #70

Triggered via push October 2, 2024 18:33
Status Success
Total duration 20s
Artifacts

lint.yml

on: push
Run linters
10s
Run linters
Fit to window
Zoom out
Zoom in

Annotations

49 errors and 2 warnings
cloudwatch/cw_fetch_log_groups_with_creation_date.py#L40
Local variable 'age_human_readable' is assigned to but never used (F841)
ec2/ec2_delete_ssh_access_security_groups.py#L91
Comparison to False should be 'if cond is False:' or 'if not cond:' (E712)
general/delete_unused_security_groups.py#L112
Undefined name 'logger' (F821)
iam/iam_identity_center_create_users.py#L57
Local variable 'instance_arn' is assigned to but never used (F841)
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L35
creation_time_millis = log_group.get("creationTime", 0) creation_date = datetime.fromtimestamp(creation_time_millis / 1000) # Calculate the age of the log group age_delta = datetime.now() - creation_date - age_human_readable = f"{age_delta.days} days" if age_delta.days > 0 else "less than a day" + age_human_readable = ( + f"{age_delta.days} days" if age_delta.days > 0 else "less than a day" + ) # Append the extracted information to the list log_groups_info.append((log_group_name, creation_date, age_delta.days)) # Sort by age in descending order (most days to least days)
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L54
""" log_groups_info = fetch_log_groups_with_creation_dates() # Prepare data for tabulate table_data = [ - (log_group_name, creation_date, f"{age_days} days" if age_days > 0 else "less than a day") + ( + log_group_name, + creation_date, + f"{age_days} days" if age_days > 0 else "less than a day", + ) for log_group_name, creation_date, age_days in log_groups_info ] # Print table headers = ["Log Group", "Created On", "Age"]
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L15
next_token = None try: while True: if next_token: - response = appstream_client.describe_image_permissions(Name=image_name, NextToken=next_token) + response = appstream_client.describe_image_permissions( + Name=image_name, NextToken=next_token + ) else: response = appstream_client.describe_image_permissions(Name=image_name) for permission in response.get("SharedImagePermissionsList", []): shared_account_ids.append(permission["sharedAccountId"])
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L38
def unshare_image(appstream_client, image_name, account_ids): for account_id in account_ids: try: - appstream_client.delete_image_permissions(Name=image_name, SharedAccountId=account_id) + appstream_client.delete_image_permissions( + Name=image_name, SharedAccountId=account_id + ) print(f"Unshared image from account: {account_id}") except Exception as e: print(f"Failed to unshare image from account {account_id}: {str(e)}")
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L55
if shared_account_ids is None: return if shared_account_ids: - print(f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):") + print( + f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):" + ) for account_id in shared_account_ids: print(f" - {account_id}") confirm = input("Do you want to unshare and then delete the image? (y/n): ") if confirm.lower() != "y": print("Operation cancelled.")
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L63
to_delete_groups.append((group["logGroupName"], age)) # Print kept groups print("Log groups to keep:") for name, age in kept_groups: - print(f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})") + print( + f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})" + ) # Print groups to delete print("\nLog groups to delete:") for name, age in to_delete_groups: - print(f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})") + print( + f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})" + ) print("\nSummary:") print(f"Total log groups: {total_groups}") print(f"Log groups kept: {len(kept_groups)}") print(f"Log groups to be deleted: {len(to_delete_groups)}")
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L86
print(f"Access denied when trying to delete log group: {name}") failed_deletions.append(name) else: raise # Re-raise the exception if it's not an AccessDeniedException - print(f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}") + print( + f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}" + ) if failed_deletions: - print(f"Failed to delete {len(failed_deletions)} log groups due to access denial:") + print( + f"Failed to delete {len(failed_deletions)} log groups due to access denial:" + ) for name in failed_deletions: print(f" - {name}") def main(): - parser = argparse.ArgumentParser(description="Delete CloudWatch log groups based on retention.") + parser = argparse.ArgumentParser( + description="Delete CloudWatch log groups based on retention." + ) parser.add_argument( "--keep", type=parse_time_period, help="Keep log groups newer than this period (e.g., '5 days', '2 weeks', '1 months')", ) - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting log groups") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually deleting log groups", + ) args = parser.parse_args() client = boto3.client("logs") process_log_groups(client, args.keep, args.dry_run)
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L42
def update_log_group_retention(group, retention): try: if "retentionInDays" not in group or group["retentionInDays"] != retention: - cloudwatch.put_retention_policy(logGroupName=group["logGroupName"], retentionInDays=retention) + cloudwatch.put_retention_policy( + logGroupName=group["logGroupName"], retentionInDays=retention + ) # Verify the update - updated_group = cloudwatch.describe_log_groups(logGroupNamePrefix=group["logGroupName"])["logGroups"][0] + updated_group = cloudwatch.describe_log_groups( + logGroupNamePrefix=group["logGroupName"] + )["logGroups"][0] if updated_group.get("retentionInDays") == retention: return f"Successfully updated retention for: {group['logGroupName']}" else: return f"Failed to update retention for: {group['logGroupName']}. Current retention: {updated_group.get('retentionInDays')}" else: - return ( - f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days." - ) + return f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days." except botocore.exceptions.ClientError as e: return f"Error updating {group['logGroupName']}: {e}" def count_retention_periods(cloudwatch_log_groups):
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L96
for group in cloudwatch_log_groups if "retentionInDays" not in group or group["retentionInDays"] != retention ] if not groups_to_update: - print(f"All log groups already have the specified retention of {retention} days.") + print( + f"All log groups already have the specified retention of {retention} days." + ) return print(f"Log groups that need to be updated to {retention} days retention:") for group in groups_to_update: current_retention = group.get("retentionInDays", "Not set")
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L113
updated_count = 0 failed_count = 0 with ThreadPoolExecutor(max_workers=10) as executor: future_to_group = { - executor.submit(update_log_group_retention, group, retention): group for group in groups_to_update + executor.submit(update_log_group_retention, group, retention): group + for group in groups_to_update } for future in as_completed(future_to_group): result = future.result() print(result) if "Successfully updated" in result:
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L160
3653, ], help="Enter the retention in days for the CloudWatch Logs.", ) parser.add_argument( - "--print-retention-counts", action="store_true", help="Print the number of log groups for each retention period" + "--print-retention-counts", + action="store_true", + help="Print the number of log groups for each retention period", ) if len(sys.argv) == 1: parser.print_help(sys.stderr) sys.exit(1) args = parser.parse_args() if args.print_retention_counts and args.retention is not None: - parser.error("--print-retention-counts cannot be used with --retention argument") + parser.error( + "--print-retention-counts cannot be used with --retention argument" + ) cloudwatch_set_retention(args)
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L23
def revoke_permissions(ec2_client, group_id, permissions): for sg in permissions: if sg.get("IpPermissions", []): for rule in sg.get("IpPermissions", []): - ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=[rule]) - print("Revoked ingress IP permissions for Security Group ID: {}".format(group_id)) + ec2_client.revoke_security_group_ingress( + GroupId=group_id, IpPermissions=[rule] + ) + print( + "Revoked ingress IP permissions for Security Group ID: {}".format( + group_id + ) + ) if sg.get("IpPermissionsEgress", []): for rule in sg.get("IpPermissionsEgress", []): - ec2_client.revoke_security_group_egress(GroupId=group_id, IpPermissions=[rule]) - print("Revoked egress IP permissions for Security Group ID: {}".format(group_id)) + ec2_client.revoke_security_group_egress( + GroupId=group_id, IpPermissions=[rule] + ) + print( + "Revoked egress IP permissions for Security Group ID: {}".format( + group_id + ) + ) def delete_security_group(ec2_client, group_id): ec2_client.delete_security_group(GroupId=group_id) print("Deleted Security Group ID: {}".format(group_id))
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L48
# Modify the tag key and value to your own liking tag_key = "ManagedByAmazonSageMakerResource" tag_value_contains = f"arn:aws:sagemaker:{aws_region}:{account_id}:domain" # Find security groups - tagged_security_groups = find_security_groups(ec2_client, tag_key, tag_value_contains) + tagged_security_groups = find_security_groups( + ec2_client, tag_key, tag_value_contains + ) # Iterate through security groups, revoke permissions, and delete for sg in tagged_security_groups: group_id = sg["GroupId"] # Fetch the current ingress and egress IP permissions - sg = ec2_client.describe_security_groups(Filters=[{"Name": "group-id", "Values": [group_id]}]).get( - "SecurityGroups", [] - ) + sg = ec2_client.describe_security_groups( + Filters=[{"Name": "group-id", "Values": [group_id]}] + ).get("SecurityGroups", []) # Revoke permissions revoke_permissions(ec2_client, group_id, sg) # Delete the security group
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L23
import boto3 from botocore.exceptions import ClientError def setup_logging(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_ec2_client(): try:
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L50
return [] def has_ssh_rule(security_group): for rule in security_group.get("IpPermissions", []): - if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp": + if ( + rule.get("FromPort") == 22 + and rule.get("ToPort") == 22 + and rule.get("IpProtocol") == "tcp" + ): return True return False def remove_ssh_rule(ec2_client, security_group, dry_run=False): group_id = security_group["GroupId"] group_name = security_group["GroupName"] ssh_rules = [ rule for rule in security_group.get("IpPermissions", []) - if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp" + if rule.get("FromPort") == 22 + and rule.get("ToPort") == 22 + and rule.get("IpProtocol") == "tcp" ] if not ssh_rules: logger.info(f"No SSH rules found in security group: {group_id} ({group_name})") return False - logger.info(f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})") + logger.info( + f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})" + ) # Fetch the security group rules with their IDs try: - response = ec2_client.describe_security_group_rules(Filters=[{"Name": "group-id", "Values": [group_id]}]) - sg_rules = {rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]} + response = ec2_client.describe_security_group_rules( + Filters=[{"Name": "group-id", "Values": [group_id]}] + ) + sg_rules = { + rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"] + } except ClientError as e: logger.error(f"Failed to fetch security group rules for {group_id}: {e}") return False for rule in ssh_rules:
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L93
for matching_rule in matching_rules: rule_id = matching_rule["SecurityGroupRuleId"] cidr_range = matching_rule.get("CidrIpv4", "N/A") logger.info(f" Rule ID: {rule_id}") - logger.info(f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}") + logger.info( + f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}" + ) logger.info(f" Protocol: {matching_rule['IpProtocol']}") logger.info(f" CIDR Range: {cidr_range}") if not dry_run: try: - ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ssh_rules) - logger.info(f"Successfully removed SSH rules from security group: {group_id} ({group_name})") + ec2_client.revoke_security_group_ingress( + GroupId=group_id, IpPermissions=ssh_rules + ) + logger.info( + f"Successfully removed SSH rules from security group: {group_id} ({group_name})" + ) return True except ClientError as e: - logger.error(f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}") + logger.error( + f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}" + ) return False return True def main(dry_run=False):
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L121
affected_groups += 1 # Summary logger.info("Summary:") logger.info(f" Total Security Groups: {len(security_groups)}") - logger.info(f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}") + logger.info( + f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}" + ) if __name__ == "__main__": logger = setup_logging() - parser = argparse.ArgumentParser(description="Remove SSH (port 22) inbound rules from EC2 Security Groups") + parser = argparse.ArgumentParser( + description="Remove SSH (port 22) inbound rules from EC2 Security Groups" + ) parser.add_argument( - "--dry-run", action="store_true", help="Perform a dry run without actually modifying security groups" + "--dry-run", + action="store_true", + help="Perform a dry run without actually modifying security groups", ) args = parser.parse_args() main(dry_run=args.dry_run)
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L27
import boto3 from botocore.exceptions import ClientError def setup_logging(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_ec2_client(): try:
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L116
if "VolumeId" in snapshot and not is_volume_exists(ec2_client, snapshot["VolumeId"]) and snapshot["SnapshotId"] not in snapshots_used_by_amis ] logger.info(f"Orphaned snapshots: {len(orphaned_snapshots)}") - logger.info(f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}") + logger.info( + f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" + ) if retention_days is not None: # Filter snapshots based on retention period - cutoff_date = datetime.now(orphaned_snapshots[0]["StartTime"].tzinfo) - timedelta(days=retention_days) - orphaned_snapshots = [snapshot for snapshot in orphaned_snapshots if snapshot["StartTime"] < cutoff_date] - logger.info(f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}") + cutoff_date = datetime.now( + orphaned_snapshots[0]["StartTime"].tzinfo + ) - timedelta(days=retention_days) + orphaned_snapshots = [ + snapshot + for snapshot in orphaned_snapshots + if snapshot["StartTime"] < cutoff_date + ] + logger.info( + f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}" + ) logger.info( f"Orphaned snapshot IDs to be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" ) if not orphaned_snapshots: logger.info("No orphaned snapshots found to delete.") return if dry_run: - logger.info(f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s).") + logger.info( + f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s)." + ) logger.info( f"Snapshot IDs that would be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}" ) else: - deleted_count = delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run) + deleted_count = delete_orphaned_snapshots( + ec2_client, orphaned_snapshots, dry_run + ) logger.info(f"Deleted {deleted_count} orphaned snapshot(s).") # Summary logger.info("Summary:") logger.info(f" Total owned snapshots: {len(owned_snapshots)}")
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L151
if __name__ == "__main__": logger = setup_logging() parser = argparse.ArgumentParser(description="Delete orphaned EC2 snapshots") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting snapshots") - parser.add_argument("--retention-days", type=int, help="Number of days to retain snapshots before deletion") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually deleting snapshots", + ) + parser.add_argument( + "--retention-days", + type=int, + help="Number of days to retain snapshots before deletion", + ) parser.add_argument("--profile", help="AWS CLI profile name") args = parser.parse_args() if args.profile: boto3.setup_default_session(profile_name=args.profile)
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L24
import boto3 from botocore.exceptions import ClientError def setup_logging(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_ec2_client_and_resource(): try:
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L50
return [] def get_used_key_pairs(ec2_resource): try: - used_keys = set(instance.key_name for instance in ec2_resource.instances.all() if instance.key_name) + used_keys = set( + instance.key_name + for instance in ec2_resource.instances.all() + if instance.key_name + ) logger.info(f"Used Keys: {len(used_keys)} : {used_keys}") return used_keys except ClientError as e: logger.error(f"Failed to retrieve used key pairs: {e}") return set()
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L79
ec2_client, ec2_resource = get_ec2_client_and_resource() all_key_pairs = get_all_key_pairs(ec2_resource) used_keys = get_used_key_pairs(ec2_resource) - unused_keys = [key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys] + unused_keys = [ + key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys + ] logger.info(f"Unused Keys: {len(unused_keys)} : {unused_keys}") if not unused_keys: logger.info("No unused key pairs found.") return
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L96
if __name__ == "__main__": logger = setup_logging() parser = argparse.ArgumentParser(description="Delete unused EC2 key pairs") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting key pairs") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually deleting key pairs", + ) args = parser.parse_args() main(dry_run=args.dry_run)
/home/runner/work/aws-toolbox/aws-toolbox/efs/efs_delete_tagged_filesystems.py#L38
# Delete the mount targets for the EFS filesystem delete_mount_targets(efs_client, filesystem_id) # Wait with exponential backoff delay = (2**current_retry) + random.uniform(0, 1) - print(f"Waiting for {delay} seconds before attempting to delete the EFS filesystem.") + print( + f"Waiting for {delay} seconds before attempting to delete the EFS filesystem." + ) time.sleep(delay) # Delete the specified EFS filesystem efs_client.delete_file_system(FileSystemId=filesystem_id) print("Deleted EFS Filesystem: {}".format(filesystem_id))
/home/runner/work/aws-toolbox/aws-toolbox/ecs/ecs_delete_inactive_task_definitions.py#L27
client.delete_task_definitions(taskDefinitions=[arn]) print(f"Deleted task definition {arn}") break # Break the loop if deletion was successful except client.exceptions.ClientException as e: if "Throttling" in str(e): # Check for throttling in the error message - print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...") + print( + f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..." + ) time.sleep(backoff) backoff *= 2 # Exponential backoff else: print(f"Client exception when deleting task definition {arn}: {e}") break # Break the loop for other client exceptions except client.exceptions.ServerException as e: if "Throttling" in str(e): # Check for throttling in the error message - print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...") + print( + f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..." + ) time.sleep(backoff) backoff *= 2 # Exponential backoff else: print(f"Server exception when deleting task definition {arn}: {e}") break # Break the loop for other server exceptions
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L27
import boto3 from botocore.exceptions import ClientError def setup_logging(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_ec2_client(): try:
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L45
try: owned_amis = [] paginator = ec2_client.get_paginator("describe_images") for page in paginator.paginate(Owners=["self"]): owned_amis.extend(page["Images"]) - logger.info(f"Owned AMIs: {len(owned_amis)} : {[ami['ImageId'] for ami in owned_amis]}") + logger.info( + f"Owned AMIs: {len(owned_amis)} : {[ami['ImageId'] for ami in owned_amis]}" + ) return owned_amis except ClientError as e: logger.error(f"Failed to retrieve owned AMIs: {e}") return []
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L113
owned_amis = get_owned_amis(ec2_client) used_amis = get_used_amis(ec2_client) # Find unused AMIs unused_amis = [ami for ami in owned_amis if ami["ImageId"] not in used_amis] - logger.info(f"Unused AMIs: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}") + logger.info( + f"Unused AMIs: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}" + ) if retention_days is not None: # Filter AMIs based on retention period cutoff_date = datetime.now() - timedelta(days=retention_days) unused_amis = [ - ami for ami in unused_amis if datetime.strptime(ami["CreationDate"], "%Y-%m-%dT%H:%M:%S.%fZ") < cutoff_date + ami + for ami in unused_amis + if datetime.strptime(ami["CreationDate"], "%Y-%m-%dT%H:%M:%S.%fZ") + < cutoff_date ] logger.info( f"Unused AMIs older than {retention_days} days: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}" ) if not unused_amis: logger.info("No unused AMIs found to delete.") return if dry_run: - logger.info(f"Dry run: Would delete {len(unused_amis)} unused AMI(s) and their associated snapshots.") + logger.info( + f"Dry run: Would delete {len(unused_amis)} unused AMI(s) and their associated snapshots." + ) else: deleted_count = delete_unused_amis(ec2_client, unused_amis, dry_run) - logger.info(f"Deleted {deleted_count} unused AMI(s) and their associated snapshots.") + logger.info( + f"Deleted {deleted_count} unused AMI(s) and their associated snapshots." + ) # Summary logger.info("Summary:") logger.info(f" Total owned AMIs: {len(owned_amis)}") logger.info(f" Used AMIs: {len(used_amis)}")
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L146
if __name__ == "__main__": logger = setup_logging() parser = argparse.ArgumentParser(description="Delete unused EC2 AMIs") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting AMIs") - parser.add_argument("--retention-days", type=int, help="Number of days to retain AMIs before deletion") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually deleting AMIs", + ) + parser.add_argument( + "--retention-days", + type=int, + help="Number of days to retain AMIs before deletion", + ) args = parser.parse_args() main(dry_run=args.dry_run, retention_days=args.retention_days)
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L64
group_id = None if group_name: try: group_response = identitystore.list_groups( IdentityStoreId=identity_store_id, - Filters=[{"AttributePath": "DisplayName", "AttributeValue": group_name}], + Filters=[ + {"AttributePath": "DisplayName", "AttributeValue": group_name} + ], ) if group_response["Groups"]: group_id = group_response["Groups"][0]["GroupId"] else: - print(f"Group '{group_name}' not found. Users will be created without group assignment.") + print( + f"Group '{group_name}' not found. Users will be created without group assignment." + ) except ClientError as e: print(f"Error checking group: {e}") return [], emails for email in emails:
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L91
) # If group_id is available, add user to the group if group_id: identitystore.create_group_membership( - IdentityStoreId=identity_store_id, GroupId=group_id, MemberId={"UserId": user_response["UserId"]} + IdentityStoreId=identity_store_id, + GroupId=group_id, + MemberId={"UserId": user_response["UserId"]}, ) successful.append(email) print(f"Successfully created user: {email} ({first_name} {last_name})") except ClientError as e:
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L104
return successful, failed def main(): - parser = argparse.ArgumentParser(description="Create SSO users from a list of email addresses.") - parser.add_argument("--emails", nargs="+", required=True, help="List of email addresses") + parser = argparse.ArgumentParser( + description="Create SSO users from a list of email addresses." + ) + parser.add_argument( + "--emails", nargs="+", required=True, help="List of email addresses" + ) parser.add_argument("--group", help="Optional group name to assign users to") args = parser.parse_args() successful, failed = create_sso_users(args.emails, args.group)
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L40
from botocore.exceptions import ClientError def setup_logging(): """Configure logging for the script.""" - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_used_security_groups(ec2, elb, elbv2, rds, logger, sg_type): """Collect all security groups in use."""
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L76
# Application and Network Load Balancers for lb in elbv2.describe_load_balancers()["LoadBalancers"]: if "SecurityGroups" in lb: used_sg.update(lb["SecurityGroups"]) else: - logger.debug(f"ALB/NLB without SecurityGroups: {lb.get('LoadBalancerName', 'Unknown')}") + logger.debug( + f"ALB/NLB without SecurityGroups: {lb.get('LoadBalancerName', 'Unknown')}" + ) except ClientError as e: - logger.error(f"Error describing Application/Network Load Balancers: {str(e)}") + logger.error( + f"Error describing Application/Network Load Balancers: {str(e)}" + ) if sg_type in ["all", "rds"]: try: # RDS Instances for instance in rds.describe_db_instances()["DBInstances"]: - used_sg.update(sg["VpcSecurityGroupId"] for sg in instance["VpcSecurityGroups"]) + used_sg.update( + sg["VpcSecurityGroupId"] for sg in instance["VpcSecurityGroups"] + ) except ClientError as e: logger.error(f"Error describing RDS instances: {str(e)}") return used_sg
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L100
response = ec2.describe_security_groups() for sg in response["SecurityGroups"]: group_name = sg["GroupName"].lower() if sg_type == "all": all_sg.add(sg["GroupId"]) - elif sg_type == "ec2" and not (group_name.startswith("rds-") or group_name.startswith("elb-")): + elif sg_type == "ec2" and not ( + group_name.startswith("rds-") or group_name.startswith("elb-") + ): all_sg.add(sg["GroupId"]) elif sg_type == "rds" and group_name.startswith("rds-"): all_sg.add(sg["GroupId"]) elif sg_type == "elb" and group_name.startswith("elb-"): all_sg.add(sg["GroupId"])
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L115
def delete_unused_security_groups(ec2, unused_sg, dry_run, logger): """Delete unused security groups, skipping those with 'default' in the name.""" for sg_id in unused_sg: try: - sg_info = ec2.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0] + sg_info = ec2.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][ + 0 + ] sg_name = sg_info["GroupName"] if "default" in sg_name.lower(): logger.info( f"Skipping deletion of security group '{sg_name}' (ID: {sg_id}) because it contains 'default'" ) continue if dry_run: - logger.info(f"[DRY RUN] Would delete security group '{sg_name}' (ID: {sg_id})") + logger.info( + f"[DRY RUN] Would delete security group '{sg_name}' (ID: {sg_id})" + ) else: logger.info(f"Deleting security group '{sg_name}' (ID: {sg_id})") ec2.delete_security_group(GroupId=sg_id) except ClientError as e: if e.response["Error"]["Code"] == "DependencyViolation": logger.warning( f"Skipping deletion of security group '{sg_name}' (ID: {sg_id}) because it has a dependent object." ) else: - logger.error(f"Error deleting security group '{sg_name}' (ID: {sg_id}): {str(e)}") + logger.error( + f"Error deleting security group '{sg_name}' (ID: {sg_id}): {str(e)}" + ) def main(dry_run, sg_type): logger = setup_logging()
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L164
delete_unused_security_groups(ec2, unused_sg, dry_run, logger) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Delete unused AWS security groups") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without deleting security groups") + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without deleting security groups", + ) parser.add_argument( "--type", choices=["all", "ec2", "rds", "elb"], default="all", help="Specify the type of security groups to consider (default: all)",
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L14
from botocore.exceptions import ClientError def parse_arguments(): """Parse command line arguments.""" - parser = argparse.ArgumentParser(description="Delete empty S3 buckets without versioning.") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without deleting buckets") + parser = argparse.ArgumentParser( + description="Delete empty S3 buckets without versioning." + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without deleting buckets", + ) return parser.parse_args() def is_bucket_empty_and_unversioned(s3_client, bucket_name): """Check if a bucket is empty and has versioning disabled."""
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L44
buckets = response["Buckets"] except ClientError as e: print(f"Error listing buckets: {e}", file=sys.stderr) return [] - return [bucket["Name"] for bucket in buckets if is_bucket_empty_and_unversioned(s3_client, bucket["Name"])] + return [ + bucket["Name"] + for bucket in buckets + if is_bucket_empty_and_unversioned(s3_client, bucket["Name"]) + ] def delete_buckets(s3_resource, bucket_names, dry_run=False): """Delete the specified buckets.""" for bucket_name in bucket_names:
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L80
print(f"- {bucket}") if args.dry_run: print("\nDry run mode. No buckets will be deleted.") else: - confirmation = input("\nDo you want to delete these buckets? (yes/no): ").lower() + confirmation = input( + "\nDo you want to delete these buckets? (yes/no): " + ).lower() if confirmation != "yes": print("Operation cancelled.") return delete_buckets(s3_resource, empty_buckets, args.dry_run)
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L25
from botocore.config import Config from botocore.exceptions import ClientError def setup_logging(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) return logging.getLogger(__name__) def get_s3_client(): try:
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L85
count += 1 # Process in batches of 1000 (S3 delete_objects limit) if len(object_versions) >= 1000: if dry_run: - logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}") + logger.info( + f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}" + ) else: delete_objects(object_versions) object_versions = [] # Log progress every 10000 objects if count % 10000 == 0: - logger.info(f"Processed {count} {'versions' if is_versioned else 'objects'}") + logger.info( + f"Processed {count} {'versions' if is_versioned else 'objects'}" + ) # Delete any remaining objects if object_versions: if dry_run: - logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}") + logger.info( + f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}" + ) else: delete_objects(object_versions) logger.info( f"{'Would delete' if dry_run else 'Deleted'} a total of {count} {'versions' if is_versioned else 'objects'} from {bucket_name}"
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L146
size_bytes = get_bucket_size(s3_client, bucket_name) size_gb = size_bytes / (1024**3) # Convert bytes to gigabytes logger.info(f"Bucket size: {size_gb:.2f} GB") if dry_run: - logger.info(f"Dry run: Would delete all contents and the bucket itself: {bucket_name}") + logger.info( + f"Dry run: Would delete all contents and the bucket itself: {bucket_name}" + ) else: delete_bucket_contents(s3_client, bucket_name, dry_run) delete_bucket(s3_client, bucket_name, dry_run) logger.info("Operation completed.")
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L158
if __name__ == "__main__": logger = setup_logging() parser = argparse.ArgumentParser(description="Delete S3 bucket and its contents") - parser.add_argument("bucket_name", help="Name of the bucket to search for and delete") - parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting anything") + parser.add_argument( + "bucket_name", help="Name of the bucket to search for and delete" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually deleting anything", + ) args = parser.parse_args() main(args.bucket_name, args.dry_run)
Run linters
The following actions uses node12 which is deprecated and will be forced to run on node16: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2023-06-13-github-actions-all-actions-will-run-on-node16-instead-of-node12-by-default/
Run linters
The following actions use a deprecated Node.js version and will be forced to run on node20: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2024-03-07-github-actions-all-actions-will-run-on-node20-instead-of-node16-by-default/