feat(docs): Update README with new EC2 scripts #70
Annotations
49 errors and 2 warnings
cloudwatch/cw_fetch_log_groups_with_creation_date.py#L40
Local variable 'age_human_readable' is assigned to but never used (F841)
|
ec2/ec2_delete_ssh_access_security_groups.py#L91
Comparison to False should be 'if cond is False:' or 'if not cond:' (E712)
|
general/delete_unused_security_groups.py#L112
Undefined name 'logger' (F821)
|
iam/iam_identity_center_create_users.py#L57
Local variable 'instance_arn' is assigned to but never used (F841)
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L35
creation_time_millis = log_group.get("creationTime", 0)
creation_date = datetime.fromtimestamp(creation_time_millis / 1000)
# Calculate the age of the log group
age_delta = datetime.now() - creation_date
- age_human_readable = f"{age_delta.days} days" if age_delta.days > 0 else "less than a day"
+ age_human_readable = (
+ f"{age_delta.days} days" if age_delta.days > 0 else "less than a day"
+ )
# Append the extracted information to the list
log_groups_info.append((log_group_name, creation_date, age_delta.days))
# Sort by age in descending order (most days to least days)
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L54
"""
log_groups_info = fetch_log_groups_with_creation_dates()
# Prepare data for tabulate
table_data = [
- (log_group_name, creation_date, f"{age_days} days" if age_days > 0 else "less than a day")
+ (
+ log_group_name,
+ creation_date,
+ f"{age_days} days" if age_days > 0 else "less than a day",
+ )
for log_group_name, creation_date, age_days in log_groups_info
]
# Print table
headers = ["Log Group", "Created On", "Age"]
|
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L15
next_token = None
try:
while True:
if next_token:
- response = appstream_client.describe_image_permissions(Name=image_name, NextToken=next_token)
+ response = appstream_client.describe_image_permissions(
+ Name=image_name, NextToken=next_token
+ )
else:
response = appstream_client.describe_image_permissions(Name=image_name)
for permission in response.get("SharedImagePermissionsList", []):
shared_account_ids.append(permission["sharedAccountId"])
|
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L38
def unshare_image(appstream_client, image_name, account_ids):
for account_id in account_ids:
try:
- appstream_client.delete_image_permissions(Name=image_name, SharedAccountId=account_id)
+ appstream_client.delete_image_permissions(
+ Name=image_name, SharedAccountId=account_id
+ )
print(f"Unshared image from account: {account_id}")
except Exception as e:
print(f"Failed to unshare image from account {account_id}: {str(e)}")
|
/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L55
if shared_account_ids is None:
return
if shared_account_ids:
- print(f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):")
+ print(
+ f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):"
+ )
for account_id in shared_account_ids:
print(f" - {account_id}")
confirm = input("Do you want to unshare and then delete the image? (y/n): ")
if confirm.lower() != "y":
print("Operation cancelled.")
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L63
to_delete_groups.append((group["logGroupName"], age))
# Print kept groups
print("Log groups to keep:")
for name, age in kept_groups:
- print(f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})")
+ print(
+ f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})"
+ )
# Print groups to delete
print("\nLog groups to delete:")
for name, age in to_delete_groups:
- print(f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})")
+ print(
+ f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})"
+ )
print("\nSummary:")
print(f"Total log groups: {total_groups}")
print(f"Log groups kept: {len(kept_groups)}")
print(f"Log groups to be deleted: {len(to_delete_groups)}")
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L86
print(f"Access denied when trying to delete log group: {name}")
failed_deletions.append(name)
else:
raise # Re-raise the exception if it's not an AccessDeniedException
- print(f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}")
+ print(
+ f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}"
+ )
if failed_deletions:
- print(f"Failed to delete {len(failed_deletions)} log groups due to access denial:")
+ print(
+ f"Failed to delete {len(failed_deletions)} log groups due to access denial:"
+ )
for name in failed_deletions:
print(f" - {name}")
def main():
- parser = argparse.ArgumentParser(description="Delete CloudWatch log groups based on retention.")
+ parser = argparse.ArgumentParser(
+ description="Delete CloudWatch log groups based on retention."
+ )
parser.add_argument(
"--keep",
type=parse_time_period,
help="Keep log groups newer than this period (e.g., '5 days', '2 weeks', '1 months')",
)
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting log groups")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting log groups",
+ )
args = parser.parse_args()
client = boto3.client("logs")
process_log_groups(client, args.keep, args.dry_run)
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L42
def update_log_group_retention(group, retention):
try:
if "retentionInDays" not in group or group["retentionInDays"] != retention:
- cloudwatch.put_retention_policy(logGroupName=group["logGroupName"], retentionInDays=retention)
+ cloudwatch.put_retention_policy(
+ logGroupName=group["logGroupName"], retentionInDays=retention
+ )
# Verify the update
- updated_group = cloudwatch.describe_log_groups(logGroupNamePrefix=group["logGroupName"])["logGroups"][0]
+ updated_group = cloudwatch.describe_log_groups(
+ logGroupNamePrefix=group["logGroupName"]
+ )["logGroups"][0]
if updated_group.get("retentionInDays") == retention:
return f"Successfully updated retention for: {group['logGroupName']}"
else:
return f"Failed to update retention for: {group['logGroupName']}. Current retention: {updated_group.get('retentionInDays')}"
else:
- return (
- f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
- )
+ return f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
except botocore.exceptions.ClientError as e:
return f"Error updating {group['logGroupName']}: {e}"
def count_retention_periods(cloudwatch_log_groups):
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L96
for group in cloudwatch_log_groups
if "retentionInDays" not in group or group["retentionInDays"] != retention
]
if not groups_to_update:
- print(f"All log groups already have the specified retention of {retention} days.")
+ print(
+ f"All log groups already have the specified retention of {retention} days."
+ )
return
print(f"Log groups that need to be updated to {retention} days retention:")
for group in groups_to_update:
current_retention = group.get("retentionInDays", "Not set")
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L113
updated_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_group = {
- executor.submit(update_log_group_retention, group, retention): group for group in groups_to_update
+ executor.submit(update_log_group_retention, group, retention): group
+ for group in groups_to_update
}
for future in as_completed(future_to_group):
result = future.result()
print(result)
if "Successfully updated" in result:
|
/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L160
3653,
],
help="Enter the retention in days for the CloudWatch Logs.",
)
parser.add_argument(
- "--print-retention-counts", action="store_true", help="Print the number of log groups for each retention period"
+ "--print-retention-counts",
+ action="store_true",
+ help="Print the number of log groups for each retention period",
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
if args.print_retention_counts and args.retention is not None:
- parser.error("--print-retention-counts cannot be used with --retention argument")
+ parser.error(
+ "--print-retention-counts cannot be used with --retention argument"
+ )
cloudwatch_set_retention(args)
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L23
def revoke_permissions(ec2_client, group_id, permissions):
for sg in permissions:
if sg.get("IpPermissions", []):
for rule in sg.get("IpPermissions", []):
- ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=[rule])
- print("Revoked ingress IP permissions for Security Group ID: {}".format(group_id))
+ ec2_client.revoke_security_group_ingress(
+ GroupId=group_id, IpPermissions=[rule]
+ )
+ print(
+ "Revoked ingress IP permissions for Security Group ID: {}".format(
+ group_id
+ )
+ )
if sg.get("IpPermissionsEgress", []):
for rule in sg.get("IpPermissionsEgress", []):
- ec2_client.revoke_security_group_egress(GroupId=group_id, IpPermissions=[rule])
- print("Revoked egress IP permissions for Security Group ID: {}".format(group_id))
+ ec2_client.revoke_security_group_egress(
+ GroupId=group_id, IpPermissions=[rule]
+ )
+ print(
+ "Revoked egress IP permissions for Security Group ID: {}".format(
+ group_id
+ )
+ )
def delete_security_group(ec2_client, group_id):
ec2_client.delete_security_group(GroupId=group_id)
print("Deleted Security Group ID: {}".format(group_id))
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L48
# Modify the tag key and value to your own liking
tag_key = "ManagedByAmazonSageMakerResource"
tag_value_contains = f"arn:aws:sagemaker:{aws_region}:{account_id}:domain"
# Find security groups
- tagged_security_groups = find_security_groups(ec2_client, tag_key, tag_value_contains)
+ tagged_security_groups = find_security_groups(
+ ec2_client, tag_key, tag_value_contains
+ )
# Iterate through security groups, revoke permissions, and delete
for sg in tagged_security_groups:
group_id = sg["GroupId"]
# Fetch the current ingress and egress IP permissions
- sg = ec2_client.describe_security_groups(Filters=[{"Name": "group-id", "Values": [group_id]}]).get(
- "SecurityGroups", []
- )
+ sg = ec2_client.describe_security_groups(
+ Filters=[{"Name": "group-id", "Values": [group_id]}]
+ ).get("SecurityGroups", [])
# Revoke permissions
revoke_permissions(ec2_client, group_id, sg)
# Delete the security group
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L23
import boto3
from botocore.exceptions import ClientError
def setup_logging():
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_ec2_client():
try:
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L50
return []
def has_ssh_rule(security_group):
for rule in security_group.get("IpPermissions", []):
- if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp":
+ if (
+ rule.get("FromPort") == 22
+ and rule.get("ToPort") == 22
+ and rule.get("IpProtocol") == "tcp"
+ ):
return True
return False
def remove_ssh_rule(ec2_client, security_group, dry_run=False):
group_id = security_group["GroupId"]
group_name = security_group["GroupName"]
ssh_rules = [
rule
for rule in security_group.get("IpPermissions", [])
- if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp"
+ if rule.get("FromPort") == 22
+ and rule.get("ToPort") == 22
+ and rule.get("IpProtocol") == "tcp"
]
if not ssh_rules:
logger.info(f"No SSH rules found in security group: {group_id} ({group_name})")
return False
- logger.info(f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})")
+ logger.info(
+ f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})"
+ )
# Fetch the security group rules with their IDs
try:
- response = ec2_client.describe_security_group_rules(Filters=[{"Name": "group-id", "Values": [group_id]}])
- sg_rules = {rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]}
+ response = ec2_client.describe_security_group_rules(
+ Filters=[{"Name": "group-id", "Values": [group_id]}]
+ )
+ sg_rules = {
+ rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]
+ }
except ClientError as e:
logger.error(f"Failed to fetch security group rules for {group_id}: {e}")
return False
for rule in ssh_rules:
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L93
for matching_rule in matching_rules:
rule_id = matching_rule["SecurityGroupRuleId"]
cidr_range = matching_rule.get("CidrIpv4", "N/A")
logger.info(f" Rule ID: {rule_id}")
- logger.info(f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}")
+ logger.info(
+ f" Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}"
+ )
logger.info(f" Protocol: {matching_rule['IpProtocol']}")
logger.info(f" CIDR Range: {cidr_range}")
if not dry_run:
try:
- ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ssh_rules)
- logger.info(f"Successfully removed SSH rules from security group: {group_id} ({group_name})")
+ ec2_client.revoke_security_group_ingress(
+ GroupId=group_id, IpPermissions=ssh_rules
+ )
+ logger.info(
+ f"Successfully removed SSH rules from security group: {group_id} ({group_name})"
+ )
return True
except ClientError as e:
- logger.error(f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}")
+ logger.error(
+ f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}"
+ )
return False
return True
def main(dry_run=False):
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L121
affected_groups += 1
# Summary
logger.info("Summary:")
logger.info(f" Total Security Groups: {len(security_groups)}")
- logger.info(f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}")
+ logger.info(
+ f" Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}"
+ )
if __name__ == "__main__":
logger = setup_logging()
- parser = argparse.ArgumentParser(description="Remove SSH (port 22) inbound rules from EC2 Security Groups")
+ parser = argparse.ArgumentParser(
+ description="Remove SSH (port 22) inbound rules from EC2 Security Groups"
+ )
parser.add_argument(
- "--dry-run", action="store_true", help="Perform a dry run without actually modifying security groups"
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually modifying security groups",
)
args = parser.parse_args()
main(dry_run=args.dry_run)
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L27
import boto3
from botocore.exceptions import ClientError
def setup_logging():
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_ec2_client():
try:
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L116
if "VolumeId" in snapshot
and not is_volume_exists(ec2_client, snapshot["VolumeId"])
and snapshot["SnapshotId"] not in snapshots_used_by_amis
]
logger.info(f"Orphaned snapshots: {len(orphaned_snapshots)}")
- logger.info(f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}")
+ logger.info(
+ f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
+ )
if retention_days is not None:
# Filter snapshots based on retention period
- cutoff_date = datetime.now(orphaned_snapshots[0]["StartTime"].tzinfo) - timedelta(days=retention_days)
- orphaned_snapshots = [snapshot for snapshot in orphaned_snapshots if snapshot["StartTime"] < cutoff_date]
- logger.info(f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}")
+ cutoff_date = datetime.now(
+ orphaned_snapshots[0]["StartTime"].tzinfo
+ ) - timedelta(days=retention_days)
+ orphaned_snapshots = [
+ snapshot
+ for snapshot in orphaned_snapshots
+ if snapshot["StartTime"] < cutoff_date
+ ]
+ logger.info(
+ f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}"
+ )
logger.info(
f"Orphaned snapshot IDs to be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
)
if not orphaned_snapshots:
logger.info("No orphaned snapshots found to delete.")
return
if dry_run:
- logger.info(f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s).")
+ logger.info(
+ f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s)."
+ )
logger.info(
f"Snapshot IDs that would be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
)
else:
- deleted_count = delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run)
+ deleted_count = delete_orphaned_snapshots(
+ ec2_client, orphaned_snapshots, dry_run
+ )
logger.info(f"Deleted {deleted_count} orphaned snapshot(s).")
# Summary
logger.info("Summary:")
logger.info(f" Total owned snapshots: {len(owned_snapshots)}")
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L151
if __name__ == "__main__":
logger = setup_logging()
parser = argparse.ArgumentParser(description="Delete orphaned EC2 snapshots")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting snapshots")
- parser.add_argument("--retention-days", type=int, help="Number of days to retain snapshots before deletion")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting snapshots",
+ )
+ parser.add_argument(
+ "--retention-days",
+ type=int,
+ help="Number of days to retain snapshots before deletion",
+ )
parser.add_argument("--profile", help="AWS CLI profile name")
args = parser.parse_args()
if args.profile:
boto3.setup_default_session(profile_name=args.profile)
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L24
import boto3
from botocore.exceptions import ClientError
def setup_logging():
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_ec2_client_and_resource():
try:
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L50
return []
def get_used_key_pairs(ec2_resource):
try:
- used_keys = set(instance.key_name for instance in ec2_resource.instances.all() if instance.key_name)
+ used_keys = set(
+ instance.key_name
+ for instance in ec2_resource.instances.all()
+ if instance.key_name
+ )
logger.info(f"Used Keys: {len(used_keys)} : {used_keys}")
return used_keys
except ClientError as e:
logger.error(f"Failed to retrieve used key pairs: {e}")
return set()
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L79
ec2_client, ec2_resource = get_ec2_client_and_resource()
all_key_pairs = get_all_key_pairs(ec2_resource)
used_keys = get_used_key_pairs(ec2_resource)
- unused_keys = [key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys]
+ unused_keys = [
+ key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys
+ ]
logger.info(f"Unused Keys: {len(unused_keys)} : {unused_keys}")
if not unused_keys:
logger.info("No unused key pairs found.")
return
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L96
if __name__ == "__main__":
logger = setup_logging()
parser = argparse.ArgumentParser(description="Delete unused EC2 key pairs")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting key pairs")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting key pairs",
+ )
args = parser.parse_args()
main(dry_run=args.dry_run)
|
/home/runner/work/aws-toolbox/aws-toolbox/efs/efs_delete_tagged_filesystems.py#L38
# Delete the mount targets for the EFS filesystem
delete_mount_targets(efs_client, filesystem_id)
# Wait with exponential backoff
delay = (2**current_retry) + random.uniform(0, 1)
- print(f"Waiting for {delay} seconds before attempting to delete the EFS filesystem.")
+ print(
+ f"Waiting for {delay} seconds before attempting to delete the EFS filesystem."
+ )
time.sleep(delay)
# Delete the specified EFS filesystem
efs_client.delete_file_system(FileSystemId=filesystem_id)
print("Deleted EFS Filesystem: {}".format(filesystem_id))
|
/home/runner/work/aws-toolbox/aws-toolbox/ecs/ecs_delete_inactive_task_definitions.py#L27
client.delete_task_definitions(taskDefinitions=[arn])
print(f"Deleted task definition {arn}")
break # Break the loop if deletion was successful
except client.exceptions.ClientException as e:
if "Throttling" in str(e): # Check for throttling in the error message
- print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...")
+ print(
+ f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..."
+ )
time.sleep(backoff)
backoff *= 2 # Exponential backoff
else:
print(f"Client exception when deleting task definition {arn}: {e}")
break # Break the loop for other client exceptions
except client.exceptions.ServerException as e:
if "Throttling" in str(e): # Check for throttling in the error message
- print(f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds...")
+ print(
+ f"Throttling exception when deleting {arn}: {e}, retrying in {backoff} seconds..."
+ )
time.sleep(backoff)
backoff *= 2 # Exponential backoff
else:
print(f"Server exception when deleting task definition {arn}: {e}")
break # Break the loop for other server exceptions
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L27
import boto3
from botocore.exceptions import ClientError
def setup_logging():
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_ec2_client():
try:
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L45
try:
owned_amis = []
paginator = ec2_client.get_paginator("describe_images")
for page in paginator.paginate(Owners=["self"]):
owned_amis.extend(page["Images"])
- logger.info(f"Owned AMIs: {len(owned_amis)} : {[ami['ImageId'] for ami in owned_amis]}")
+ logger.info(
+ f"Owned AMIs: {len(owned_amis)} : {[ami['ImageId'] for ami in owned_amis]}"
+ )
return owned_amis
except ClientError as e:
logger.error(f"Failed to retrieve owned AMIs: {e}")
return []
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L113
owned_amis = get_owned_amis(ec2_client)
used_amis = get_used_amis(ec2_client)
# Find unused AMIs
unused_amis = [ami for ami in owned_amis if ami["ImageId"] not in used_amis]
- logger.info(f"Unused AMIs: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}")
+ logger.info(
+ f"Unused AMIs: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}"
+ )
if retention_days is not None:
# Filter AMIs based on retention period
cutoff_date = datetime.now() - timedelta(days=retention_days)
unused_amis = [
- ami for ami in unused_amis if datetime.strptime(ami["CreationDate"], "%Y-%m-%dT%H:%M:%S.%fZ") < cutoff_date
+ ami
+ for ami in unused_amis
+ if datetime.strptime(ami["CreationDate"], "%Y-%m-%dT%H:%M:%S.%fZ")
+ < cutoff_date
]
logger.info(
f"Unused AMIs older than {retention_days} days: {len(unused_amis)} : {[ami['ImageId'] for ami in unused_amis]}"
)
if not unused_amis:
logger.info("No unused AMIs found to delete.")
return
if dry_run:
- logger.info(f"Dry run: Would delete {len(unused_amis)} unused AMI(s) and their associated snapshots.")
+ logger.info(
+ f"Dry run: Would delete {len(unused_amis)} unused AMI(s) and their associated snapshots."
+ )
else:
deleted_count = delete_unused_amis(ec2_client, unused_amis, dry_run)
- logger.info(f"Deleted {deleted_count} unused AMI(s) and their associated snapshots.")
+ logger.info(
+ f"Deleted {deleted_count} unused AMI(s) and their associated snapshots."
+ )
# Summary
logger.info("Summary:")
logger.info(f" Total owned AMIs: {len(owned_amis)}")
logger.info(f" Used AMIs: {len(used_amis)}")
|
/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_amis.py#L146
if __name__ == "__main__":
logger = setup_logging()
parser = argparse.ArgumentParser(description="Delete unused EC2 AMIs")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting AMIs")
- parser.add_argument("--retention-days", type=int, help="Number of days to retain AMIs before deletion")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting AMIs",
+ )
+ parser.add_argument(
+ "--retention-days",
+ type=int,
+ help="Number of days to retain AMIs before deletion",
+ )
args = parser.parse_args()
main(dry_run=args.dry_run, retention_days=args.retention_days)
|
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L64
group_id = None
if group_name:
try:
group_response = identitystore.list_groups(
IdentityStoreId=identity_store_id,
- Filters=[{"AttributePath": "DisplayName", "AttributeValue": group_name}],
+ Filters=[
+ {"AttributePath": "DisplayName", "AttributeValue": group_name}
+ ],
)
if group_response["Groups"]:
group_id = group_response["Groups"][0]["GroupId"]
else:
- print(f"Group '{group_name}' not found. Users will be created without group assignment.")
+ print(
+ f"Group '{group_name}' not found. Users will be created without group assignment."
+ )
except ClientError as e:
print(f"Error checking group: {e}")
return [], emails
for email in emails:
|
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L91
)
# If group_id is available, add user to the group
if group_id:
identitystore.create_group_membership(
- IdentityStoreId=identity_store_id, GroupId=group_id, MemberId={"UserId": user_response["UserId"]}
+ IdentityStoreId=identity_store_id,
+ GroupId=group_id,
+ MemberId={"UserId": user_response["UserId"]},
)
successful.append(email)
print(f"Successfully created user: {email} ({first_name} {last_name})")
except ClientError as e:
|
/home/runner/work/aws-toolbox/aws-toolbox/iam/iam_identity_center_create_users.py#L104
return successful, failed
def main():
- parser = argparse.ArgumentParser(description="Create SSO users from a list of email addresses.")
- parser.add_argument("--emails", nargs="+", required=True, help="List of email addresses")
+ parser = argparse.ArgumentParser(
+ description="Create SSO users from a list of email addresses."
+ )
+ parser.add_argument(
+ "--emails", nargs="+", required=True, help="List of email addresses"
+ )
parser.add_argument("--group", help="Optional group name to assign users to")
args = parser.parse_args()
successful, failed = create_sso_users(args.emails, args.group)
|
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L40
from botocore.exceptions import ClientError
def setup_logging():
"""Configure logging for the script."""
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_used_security_groups(ec2, elb, elbv2, rds, logger, sg_type):
"""Collect all security groups in use."""
|
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L76
# Application and Network Load Balancers
for lb in elbv2.describe_load_balancers()["LoadBalancers"]:
if "SecurityGroups" in lb:
used_sg.update(lb["SecurityGroups"])
else:
- logger.debug(f"ALB/NLB without SecurityGroups: {lb.get('LoadBalancerName', 'Unknown')}")
+ logger.debug(
+ f"ALB/NLB without SecurityGroups: {lb.get('LoadBalancerName', 'Unknown')}"
+ )
except ClientError as e:
- logger.error(f"Error describing Application/Network Load Balancers: {str(e)}")
+ logger.error(
+ f"Error describing Application/Network Load Balancers: {str(e)}"
+ )
if sg_type in ["all", "rds"]:
try:
# RDS Instances
for instance in rds.describe_db_instances()["DBInstances"]:
- used_sg.update(sg["VpcSecurityGroupId"] for sg in instance["VpcSecurityGroups"])
+ used_sg.update(
+ sg["VpcSecurityGroupId"] for sg in instance["VpcSecurityGroups"]
+ )
except ClientError as e:
logger.error(f"Error describing RDS instances: {str(e)}")
return used_sg
|
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L100
response = ec2.describe_security_groups()
for sg in response["SecurityGroups"]:
group_name = sg["GroupName"].lower()
if sg_type == "all":
all_sg.add(sg["GroupId"])
- elif sg_type == "ec2" and not (group_name.startswith("rds-") or group_name.startswith("elb-")):
+ elif sg_type == "ec2" and not (
+ group_name.startswith("rds-") or group_name.startswith("elb-")
+ ):
all_sg.add(sg["GroupId"])
elif sg_type == "rds" and group_name.startswith("rds-"):
all_sg.add(sg["GroupId"])
elif sg_type == "elb" and group_name.startswith("elb-"):
all_sg.add(sg["GroupId"])
|
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L115
def delete_unused_security_groups(ec2, unused_sg, dry_run, logger):
"""Delete unused security groups, skipping those with 'default' in the name."""
for sg_id in unused_sg:
try:
- sg_info = ec2.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][0]
+ sg_info = ec2.describe_security_groups(GroupIds=[sg_id])["SecurityGroups"][
+ 0
+ ]
sg_name = sg_info["GroupName"]
if "default" in sg_name.lower():
logger.info(
f"Skipping deletion of security group '{sg_name}' (ID: {sg_id}) because it contains 'default'"
)
continue
if dry_run:
- logger.info(f"[DRY RUN] Would delete security group '{sg_name}' (ID: {sg_id})")
+ logger.info(
+ f"[DRY RUN] Would delete security group '{sg_name}' (ID: {sg_id})"
+ )
else:
logger.info(f"Deleting security group '{sg_name}' (ID: {sg_id})")
ec2.delete_security_group(GroupId=sg_id)
except ClientError as e:
if e.response["Error"]["Code"] == "DependencyViolation":
logger.warning(
f"Skipping deletion of security group '{sg_name}' (ID: {sg_id}) because it has a dependent object."
)
else:
- logger.error(f"Error deleting security group '{sg_name}' (ID: {sg_id}): {str(e)}")
+ logger.error(
+ f"Error deleting security group '{sg_name}' (ID: {sg_id}): {str(e)}"
+ )
def main(dry_run, sg_type):
logger = setup_logging()
|
/home/runner/work/aws-toolbox/aws-toolbox/general/delete_unused_security_groups.py#L164
delete_unused_security_groups(ec2, unused_sg, dry_run, logger)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Delete unused AWS security groups")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without deleting security groups")
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without deleting security groups",
+ )
parser.add_argument(
"--type",
choices=["all", "ec2", "rds", "elb"],
default="all",
help="Specify the type of security groups to consider (default: all)",
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L14
from botocore.exceptions import ClientError
def parse_arguments():
"""Parse command line arguments."""
- parser = argparse.ArgumentParser(description="Delete empty S3 buckets without versioning.")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without deleting buckets")
+ parser = argparse.ArgumentParser(
+ description="Delete empty S3 buckets without versioning."
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without deleting buckets",
+ )
return parser.parse_args()
def is_bucket_empty_and_unversioned(s3_client, bucket_name):
"""Check if a bucket is empty and has versioning disabled."""
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L44
buckets = response["Buckets"]
except ClientError as e:
print(f"Error listing buckets: {e}", file=sys.stderr)
return []
- return [bucket["Name"] for bucket in buckets if is_bucket_empty_and_unversioned(s3_client, bucket["Name"])]
+ return [
+ bucket["Name"]
+ for bucket in buckets
+ if is_bucket_empty_and_unversioned(s3_client, bucket["Name"])
+ ]
def delete_buckets(s3_resource, bucket_names, dry_run=False):
"""Delete the specified buckets."""
for bucket_name in bucket_names:
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_delete_empty_buckets.py#L80
print(f"- {bucket}")
if args.dry_run:
print("\nDry run mode. No buckets will be deleted.")
else:
- confirmation = input("\nDo you want to delete these buckets? (yes/no): ").lower()
+ confirmation = input(
+ "\nDo you want to delete these buckets? (yes/no): "
+ ).lower()
if confirmation != "yes":
print("Operation cancelled.")
return
delete_buckets(s3_resource, empty_buckets, args.dry_run)
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L25
from botocore.config import Config
from botocore.exceptions import ClientError
def setup_logging():
- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+ logging.basicConfig(
+ level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+ )
return logging.getLogger(__name__)
def get_s3_client():
try:
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L85
count += 1
# Process in batches of 1000 (S3 delete_objects limit)
if len(object_versions) >= 1000:
if dry_run:
- logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}")
+ logger.info(
+ f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}"
+ )
else:
delete_objects(object_versions)
object_versions = []
# Log progress every 10000 objects
if count % 10000 == 0:
- logger.info(f"Processed {count} {'versions' if is_versioned else 'objects'}")
+ logger.info(
+ f"Processed {count} {'versions' if is_versioned else 'objects'}"
+ )
# Delete any remaining objects
if object_versions:
if dry_run:
- logger.info(f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}")
+ logger.info(
+ f"Would delete {len(object_versions)} {'versions' if is_versioned else 'objects'}"
+ )
else:
delete_objects(object_versions)
logger.info(
f"{'Would delete' if dry_run else 'Deleted'} a total of {count} {'versions' if is_versioned else 'objects'} from {bucket_name}"
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L146
size_bytes = get_bucket_size(s3_client, bucket_name)
size_gb = size_bytes / (1024**3) # Convert bytes to gigabytes
logger.info(f"Bucket size: {size_gb:.2f} GB")
if dry_run:
- logger.info(f"Dry run: Would delete all contents and the bucket itself: {bucket_name}")
+ logger.info(
+ f"Dry run: Would delete all contents and the bucket itself: {bucket_name}"
+ )
else:
delete_bucket_contents(s3_client, bucket_name, dry_run)
delete_bucket(s3_client, bucket_name, dry_run)
logger.info("Operation completed.")
|
/home/runner/work/aws-toolbox/aws-toolbox/s3/s3_search_bucket_and_delete.py#L158
if __name__ == "__main__":
logger = setup_logging()
parser = argparse.ArgumentParser(description="Delete S3 bucket and its contents")
- parser.add_argument("bucket_name", help="Name of the bucket to search for and delete")
- parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting anything")
+ parser.add_argument(
+ "bucket_name", help="Name of the bucket to search for and delete"
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Perform a dry run without actually deleting anything",
+ )
args = parser.parse_args()
main(args.bucket_name, args.dry_run)
|
Run linters
The following actions uses node12 which is deprecated and will be forced to run on node16: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2023-06-13-github-actions-all-actions-will-run-on-node16-instead-of-node12-by-default/
|
Run linters
The following actions use a deprecated Node.js version and will be forced to run on node20: actions/checkout@v2, actions/setup-python@v1, wearerequired/lint-action@v1. For more info: https://github.blog/changelog/2024-03-07-github-actions-all-actions-will-run-on-node20-instead-of-node16-by-default/
|