Skip to content

feat(docs): Update README with new EC2 scripts #70

feat(docs): Update README with new EC2 scripts

feat(docs): Update README with new EC2 scripts #70

GitHub Actions / Black failed Oct 2, 2024 in 0s

45 errors

Black found 45 errors

Annotations

Check failure on line 46 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L35-L46

             creation_time_millis = log_group.get("creationTime", 0)
             creation_date = datetime.fromtimestamp(creation_time_millis / 1000)
 
             # Calculate the age of the log group
             age_delta = datetime.now() - creation_date
-            age_human_readable = f"{age_delta.days} days" if age_delta.days > 0 else "less than a day"
+            age_human_readable = (
+                f"{age_delta.days} days" if age_delta.days > 0 else "less than a day"
+            )
 
             # Append the extracted information to the list
             log_groups_info.append((log_group_name, creation_date, age_delta.days))
 
     # Sort by age in descending order (most days to least days)

Check failure on line 65 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_fetch_log_groups_with_creation_date.py#L54-L65

     """
     log_groups_info = fetch_log_groups_with_creation_dates()
 
     # Prepare data for tabulate
     table_data = [
-        (log_group_name, creation_date, f"{age_days} days" if age_days > 0 else "less than a day")
+        (
+            log_group_name,
+            creation_date,
+            f"{age_days} days" if age_days > 0 else "less than a day",
+        )
         for log_group_name, creation_date, age_days in log_groups_info
     ]
 
     # Print table
     headers = ["Log Group", "Created On", "Age"]

Check failure on line 26 in /home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L15-L26

     next_token = None
 
     try:
         while True:
             if next_token:
-                response = appstream_client.describe_image_permissions(Name=image_name, NextToken=next_token)
+                response = appstream_client.describe_image_permissions(
+                    Name=image_name, NextToken=next_token
+                )
             else:
                 response = appstream_client.describe_image_permissions(Name=image_name)
 
             for permission in response.get("SharedImagePermissionsList", []):
                 shared_account_ids.append(permission["sharedAccountId"])

Check failure on line 49 in /home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L38-L49

 
 
 def unshare_image(appstream_client, image_name, account_ids):
     for account_id in account_ids:
         try:
-            appstream_client.delete_image_permissions(Name=image_name, SharedAccountId=account_id)
+            appstream_client.delete_image_permissions(
+                Name=image_name, SharedAccountId=account_id
+            )
             print(f"Unshared image from account: {account_id}")
         except Exception as e:
             print(f"Failed to unshare image from account {account_id}: {str(e)}")
 
 

Check failure on line 66 in /home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/appstream/appstream_delete_image.py#L55-L66

 
         if shared_account_ids is None:
             return
 
         if shared_account_ids:
-            print(f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):")
+            print(
+                f"Image '{image_name}' is shared with {len(shared_account_ids)} account(s):"
+            )
             for account_id in shared_account_ids:
                 print(f"  - {account_id}")
             confirm = input("Do you want to unshare and then delete the image? (y/n): ")
             if confirm.lower() != "y":
                 print("Operation cancelled.")

Check failure on line 79 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L63-L79

             to_delete_groups.append((group["logGroupName"], age))
 
     # Print kept groups
     print("Log groups to keep:")
     for name, age in kept_groups:
-        print(f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})")
+        print(
+            f"{'[DRY RUN] ' if dry_run else ''}Keeping log group: {name} (Age: {age})"
+        )
 
     # Print groups to delete
     print("\nLog groups to delete:")
     for name, age in to_delete_groups:
-        print(f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})")
+        print(
+            f"{'[DRY RUN] Would delete' if dry_run else 'Deleting'} log group: {name} (Age: {age})"
+        )
 
     print("\nSummary:")
     print(f"Total log groups: {total_groups}")
     print(f"Log groups kept: {len(kept_groups)}")
     print(f"Log groups to be deleted: {len(to_delete_groups)}")

Check failure on line 111 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_delete_log_groups.py#L86-L111

                     print(f"Access denied when trying to delete log group: {name}")
                     failed_deletions.append(name)
                 else:
                     raise  # Re-raise the exception if it's not an AccessDeniedException
 
-        print(f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}")
+        print(
+            f"Log groups actually deleted: {len(to_delete_groups) - len(failed_deletions)}"
+        )
         if failed_deletions:
-            print(f"Failed to delete {len(failed_deletions)} log groups due to access denial:")
+            print(
+                f"Failed to delete {len(failed_deletions)} log groups due to access denial:"
+            )
             for name in failed_deletions:
                 print(f"  - {name}")
 
 
 def main():
-    parser = argparse.ArgumentParser(description="Delete CloudWatch log groups based on retention.")
+    parser = argparse.ArgumentParser(
+        description="Delete CloudWatch log groups based on retention."
+    )
     parser.add_argument(
         "--keep",
         type=parse_time_period,
         help="Keep log groups newer than this period (e.g., '5 days', '2 weeks', '1 months')",
     )
-    parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting log groups")
+    parser.add_argument(
+        "--dry-run",
+        action="store_true",
+        help="Perform a dry run without actually deleting log groups",
+    )
     args = parser.parse_args()
 
     client = boto3.client("logs")
     process_log_groups(client, args.keep, args.dry_run)
 

Check failure on line 64 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L42-L64

 
 
 def update_log_group_retention(group, retention):
     try:
         if "retentionInDays" not in group or group["retentionInDays"] != retention:
-            cloudwatch.put_retention_policy(logGroupName=group["logGroupName"], retentionInDays=retention)
+            cloudwatch.put_retention_policy(
+                logGroupName=group["logGroupName"], retentionInDays=retention
+            )
 
             # Verify the update
-            updated_group = cloudwatch.describe_log_groups(logGroupNamePrefix=group["logGroupName"])["logGroups"][0]
+            updated_group = cloudwatch.describe_log_groups(
+                logGroupNamePrefix=group["logGroupName"]
+            )["logGroups"][0]
             if updated_group.get("retentionInDays") == retention:
                 return f"Successfully updated retention for: {group['logGroupName']}"
             else:
                 return f"Failed to update retention for: {group['logGroupName']}. Current retention: {updated_group.get('retentionInDays')}"
         else:
-            return (
-                f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
-            )
+            return f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
     except botocore.exceptions.ClientError as e:
         return f"Error updating {group['logGroupName']}: {e}"
 
 
 def count_retention_periods(cloudwatch_log_groups):

Check failure on line 107 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L96-L107

         for group in cloudwatch_log_groups
         if "retentionInDays" not in group or group["retentionInDays"] != retention
     ]
 
     if not groups_to_update:
-        print(f"All log groups already have the specified retention of {retention} days.")
+        print(
+            f"All log groups already have the specified retention of {retention} days."
+        )
         return
 
     print(f"Log groups that need to be updated to {retention} days retention:")
     for group in groups_to_update:
         current_retention = group.get("retentionInDays", "Not set")

Check failure on line 124 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L113-L124

     updated_count = 0
     failed_count = 0
 
     with ThreadPoolExecutor(max_workers=10) as executor:
         future_to_group = {
-            executor.submit(update_log_group_retention, group, retention): group for group in groups_to_update
+            executor.submit(update_log_group_retention, group, retention): group
+            for group in groups_to_update
         }
         for future in as_completed(future_to_group):
             result = future.result()
             print(result)
             if "Successfully updated" in result:

Check failure on line 178 in /home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/cloudwatch/cw_set_retention_policy.py#L160-L178

             3653,
         ],
         help="Enter the retention in days for the CloudWatch Logs.",
     )
     parser.add_argument(
-        "--print-retention-counts", action="store_true", help="Print the number of log groups for each retention period"
+        "--print-retention-counts",
+        action="store_true",
+        help="Print the number of log groups for each retention period",
     )
 
     if len(sys.argv) == 1:
         parser.print_help(sys.stderr)
         sys.exit(1)
 
     args = parser.parse_args()
 
     if args.print_retention_counts and args.retention is not None:
-        parser.error("--print-retention-counts cannot be used with --retention argument")
+        parser.error(
+            "--print-retention-counts cannot be used with --retention argument"
+        )
 
     cloudwatch_set_retention(args)

Check failure on line 39 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L23-L39

 
 def revoke_permissions(ec2_client, group_id, permissions):
     for sg in permissions:
         if sg.get("IpPermissions", []):
             for rule in sg.get("IpPermissions", []):
-                ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=[rule])
-                print("Revoked ingress IP permissions for Security Group ID: {}".format(group_id))
+                ec2_client.revoke_security_group_ingress(
+                    GroupId=group_id, IpPermissions=[rule]
+                )
+                print(
+                    "Revoked ingress IP permissions for Security Group ID: {}".format(
+                        group_id
+                    )
+                )
         if sg.get("IpPermissionsEgress", []):
             for rule in sg.get("IpPermissionsEgress", []):
-                ec2_client.revoke_security_group_egress(GroupId=group_id, IpPermissions=[rule])
-                print("Revoked egress IP permissions for Security Group ID: {}".format(group_id))
+                ec2_client.revoke_security_group_egress(
+                    GroupId=group_id, IpPermissions=[rule]
+                )
+                print(
+                    "Revoked egress IP permissions for Security Group ID: {}".format(
+                        group_id
+                    )
+                )
 
 
 def delete_security_group(ec2_client, group_id):
     ec2_client.delete_security_group(GroupId=group_id)
     print("Deleted Security Group ID: {}".format(group_id))

Check failure on line 68 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_tagged_security_groups.py#L48-L68

     # Modify the tag key and value to your own liking
     tag_key = "ManagedByAmazonSageMakerResource"
     tag_value_contains = f"arn:aws:sagemaker:{aws_region}:{account_id}:domain"
 
     # Find security groups
-    tagged_security_groups = find_security_groups(ec2_client, tag_key, tag_value_contains)
+    tagged_security_groups = find_security_groups(
+        ec2_client, tag_key, tag_value_contains
+    )
 
     # Iterate through security groups, revoke permissions, and delete
     for sg in tagged_security_groups:
         group_id = sg["GroupId"]
 
         # Fetch the current ingress and egress IP permissions
-        sg = ec2_client.describe_security_groups(Filters=[{"Name": "group-id", "Values": [group_id]}]).get(
-            "SecurityGroups", []
-        )
+        sg = ec2_client.describe_security_groups(
+            Filters=[{"Name": "group-id", "Values": [group_id]}]
+        ).get("SecurityGroups", [])
 
         # Revoke permissions
         revoke_permissions(ec2_client, group_id, sg)
 
         # Delete the security group

Check failure on line 34 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L23-L34

 import boto3
 from botocore.exceptions import ClientError
 
 
 def setup_logging():
-    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+    logging.basicConfig(
+        level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+    )
     return logging.getLogger(__name__)
 
 
 def get_ec2_client():
     try:

Check failure on line 84 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L50-L84

         return []
 
 
 def has_ssh_rule(security_group):
     for rule in security_group.get("IpPermissions", []):
-        if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp":
+        if (
+            rule.get("FromPort") == 22
+            and rule.get("ToPort") == 22
+            and rule.get("IpProtocol") == "tcp"
+        ):
             return True
     return False
 
 
 def remove_ssh_rule(ec2_client, security_group, dry_run=False):
     group_id = security_group["GroupId"]
     group_name = security_group["GroupName"]
     ssh_rules = [
         rule
         for rule in security_group.get("IpPermissions", [])
-        if rule.get("FromPort") == 22 and rule.get("ToPort") == 22 and rule.get("IpProtocol") == "tcp"
+        if rule.get("FromPort") == 22
+        and rule.get("ToPort") == 22
+        and rule.get("IpProtocol") == "tcp"
     ]
 
     if not ssh_rules:
         logger.info(f"No SSH rules found in security group: {group_id} ({group_name})")
         return False
 
-    logger.info(f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})")
+    logger.info(
+        f"{'Would remove' if dry_run else 'Removing'} SSH rules from security group: {group_id} ({group_name})"
+    )
 
     # Fetch the security group rules with their IDs
     try:
-        response = ec2_client.describe_security_group_rules(Filters=[{"Name": "group-id", "Values": [group_id]}])
-        sg_rules = {rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]}
+        response = ec2_client.describe_security_group_rules(
+            Filters=[{"Name": "group-id", "Values": [group_id]}]
+        )
+        sg_rules = {
+            rule["SecurityGroupRuleId"]: rule for rule in response["SecurityGroupRules"]
+        }
     except ClientError as e:
         logger.error(f"Failed to fetch security group rules for {group_id}: {e}")
         return False
 
     for rule in ssh_rules:

Check failure on line 114 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L93-L114

 
         for matching_rule in matching_rules:
             rule_id = matching_rule["SecurityGroupRuleId"]
             cidr_range = matching_rule.get("CidrIpv4", "N/A")
             logger.info(f"  Rule ID: {rule_id}")
-            logger.info(f"    Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}")
+            logger.info(
+                f"    Port Range: {matching_rule['FromPort']}-{matching_rule['ToPort']}"
+            )
             logger.info(f"    Protocol: {matching_rule['IpProtocol']}")
             logger.info(f"    CIDR Range: {cidr_range}")
 
     if not dry_run:
         try:
-            ec2_client.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ssh_rules)
-            logger.info(f"Successfully removed SSH rules from security group: {group_id} ({group_name})")
+            ec2_client.revoke_security_group_ingress(
+                GroupId=group_id, IpPermissions=ssh_rules
+            )
+            logger.info(
+                f"Successfully removed SSH rules from security group: {group_id} ({group_name})"
+            )
             return True
         except ClientError as e:
-            logger.error(f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}")
+            logger.error(
+                f"Failed to remove SSH rules from security group {group_id} ({group_name}): {e}"
+            )
             return False
     return True
 
 
 def main(dry_run=False):

Check failure on line 139 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_ssh_access_security_groups.py#L121-L139

                 affected_groups += 1
 
     # Summary
     logger.info("Summary:")
     logger.info(f"  Total Security Groups: {len(security_groups)}")
-    logger.info(f"  Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}")
+    logger.info(
+        f"  Security Groups with SSH rules {'that would be' if dry_run else ''} modified: {affected_groups}"
+    )
 
 
 if __name__ == "__main__":
     logger = setup_logging()
 
-    parser = argparse.ArgumentParser(description="Remove SSH (port 22) inbound rules from EC2 Security Groups")
+    parser = argparse.ArgumentParser(
+        description="Remove SSH (port 22) inbound rules from EC2 Security Groups"
+    )
     parser.add_argument(
-        "--dry-run", action="store_true", help="Perform a dry run without actually modifying security groups"
+        "--dry-run",
+        action="store_true",
+        help="Perform a dry run without actually modifying security groups",
     )
     args = parser.parse_args()
 
     main(dry_run=args.dry_run)

Check failure on line 38 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L27-L38

 import boto3
 from botocore.exceptions import ClientError
 
 
 def setup_logging():
-    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+    logging.basicConfig(
+        level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+    )
     return logging.getLogger(__name__)
 
 
 def get_ec2_client():
     try:

Check failure on line 148 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L116-L148

         if "VolumeId" in snapshot
         and not is_volume_exists(ec2_client, snapshot["VolumeId"])
         and snapshot["SnapshotId"] not in snapshots_used_by_amis
     ]
     logger.info(f"Orphaned snapshots: {len(orphaned_snapshots)}")
-    logger.info(f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}")
+    logger.info(
+        f"Orphaned snapshot IDs: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
+    )
 
     if retention_days is not None:
         # Filter snapshots based on retention period
-        cutoff_date = datetime.now(orphaned_snapshots[0]["StartTime"].tzinfo) - timedelta(days=retention_days)
-        orphaned_snapshots = [snapshot for snapshot in orphaned_snapshots if snapshot["StartTime"] < cutoff_date]
-        logger.info(f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}")
+        cutoff_date = datetime.now(
+            orphaned_snapshots[0]["StartTime"].tzinfo
+        ) - timedelta(days=retention_days)
+        orphaned_snapshots = [
+            snapshot
+            for snapshot in orphaned_snapshots
+            if snapshot["StartTime"] < cutoff_date
+        ]
+        logger.info(
+            f"Orphaned snapshots older than {retention_days} days: {len(orphaned_snapshots)}"
+        )
         logger.info(
             f"Orphaned snapshot IDs to be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
         )
 
     if not orphaned_snapshots:
         logger.info("No orphaned snapshots found to delete.")
         return
 
     if dry_run:
-        logger.info(f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s).")
+        logger.info(
+            f"Dry run: Would delete {len(orphaned_snapshots)} orphaned snapshot(s)."
+        )
         logger.info(
             f"Snapshot IDs that would be deleted: {[snapshot['SnapshotId'] for snapshot in orphaned_snapshots]}"
         )
     else:
-        deleted_count = delete_orphaned_snapshots(ec2_client, orphaned_snapshots, dry_run)
+        deleted_count = delete_orphaned_snapshots(
+            ec2_client, orphaned_snapshots, dry_run
+        )
         logger.info(f"Deleted {deleted_count} orphaned snapshot(s).")
 
     # Summary
     logger.info("Summary:")
     logger.info(f"  Total owned snapshots: {len(owned_snapshots)}")

Check failure on line 163 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_orphaned_snapshots.py#L151-L163

 
 if __name__ == "__main__":
     logger = setup_logging()
 
     parser = argparse.ArgumentParser(description="Delete orphaned EC2 snapshots")
-    parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting snapshots")
-    parser.add_argument("--retention-days", type=int, help="Number of days to retain snapshots before deletion")
+    parser.add_argument(
+        "--dry-run",
+        action="store_true",
+        help="Perform a dry run without actually deleting snapshots",
+    )
+    parser.add_argument(
+        "--retention-days",
+        type=int,
+        help="Number of days to retain snapshots before deletion",
+    )
     parser.add_argument("--profile", help="AWS CLI profile name")
     args = parser.parse_args()
 
     if args.profile:
         boto3.setup_default_session(profile_name=args.profile)

Check failure on line 35 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L24-L35

 import boto3
 from botocore.exceptions import ClientError
 
 
 def setup_logging():
-    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+    logging.basicConfig(
+        level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
+    )
     return logging.getLogger(__name__)
 
 
 def get_ec2_client_and_resource():
     try:

Check failure on line 61 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L50-L61

         return []
 
 
 def get_used_key_pairs(ec2_resource):
     try:
-        used_keys = set(instance.key_name for instance in ec2_resource.instances.all() if instance.key_name)
+        used_keys = set(
+            instance.key_name
+            for instance in ec2_resource.instances.all()
+            if instance.key_name
+        )
         logger.info(f"Used Keys: {len(used_keys)} : {used_keys}")
         return used_keys
     except ClientError as e:
         logger.error(f"Failed to retrieve used key pairs: {e}")
         return set()

Check failure on line 90 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L79-L90

     ec2_client, ec2_resource = get_ec2_client_and_resource()
 
     all_key_pairs = get_all_key_pairs(ec2_resource)
     used_keys = get_used_key_pairs(ec2_resource)
 
-    unused_keys = [key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys]
+    unused_keys = [
+        key_pair.name for key_pair in all_key_pairs if key_pair.name not in used_keys
+    ]
     logger.info(f"Unused Keys: {len(unused_keys)} : {unused_keys}")
 
     if not unused_keys:
         logger.info("No unused key pairs found.")
         return

Check failure on line 105 in /home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/ec2/ec2_delete_unused_keypairs_single_region.py#L96-L105

 
 if __name__ == "__main__":
     logger = setup_logging()
 
     parser = argparse.ArgumentParser(description="Delete unused EC2 key pairs")
-    parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without actually deleting key pairs")
+    parser.add_argument(
+        "--dry-run",
+        action="store_true",
+        help="Perform a dry run without actually deleting key pairs",
+    )
     args = parser.parse_args()
 
     main(dry_run=args.dry_run)

Check failure on line 49 in /home/runner/work/aws-toolbox/aws-toolbox/efs/efs_delete_tagged_filesystems.py

See this annotation in the file changed.

@github-actions github-actions / Black

/home/runner/work/aws-toolbox/aws-toolbox/efs/efs_delete_tagged_filesystems.py#L38-L49

             # Delete the mount targets for the EFS filesystem
             delete_mount_targets(efs_client, filesystem_id)
 
             # Wait with exponential backoff
             delay = (2**current_retry) + random.uniform(0, 1)
-            print(f"Waiting for {delay} seconds before attempting to delete the EFS filesystem.")
+            print(
+                f"Waiting for {delay} seconds before attempting to delete the EFS filesystem."
+            )
             time.sleep(delay)
 
             # Delete the specified EFS filesystem
             efs_client.delete_file_system(FileSystemId=filesystem_id)
             print("Deleted EFS Filesystem: {}".format(filesystem_id))