Skip to content

Commit

Permalink
Add type annotation to variables
Browse files Browse the repository at this point in the history
  • Loading branch information
Eden-D-Zhang committed Jan 27, 2025
1 parent a1781da commit 4591b1b
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
CLPConfig,
CLPDockerMounts,
dump_container_config,
generate_container_config,
generate_container_name,
Expand All @@ -28,10 +30,10 @@
FIND_COMMAND,
)

logger = logging.getLogger(__file__)
logger: logging.Logger = logging.getLogger(__file__)


def _validate_timestamps(begin_ts, end_ts):
def _validate_timestamps(begin_ts: int, end_ts: int):
if begin_ts < 0:
logger.error("begin-ts must be non-negative.")
return False
Expand All @@ -45,11 +47,11 @@ def _validate_timestamps(begin_ts, end_ts):


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
clp_home: Path = get_clp_home()
default_config_file_path: Path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

# Top-level parser and options
args_parser = argparse.ArgumentParser(
args_parser: argparse.ArgumentParser = argparse.ArgumentParser(
description="Views the list of archive IDs or deletes compressed archives."
)
args_parser.add_argument(
Expand All @@ -60,15 +62,15 @@ def main(argv):
)

# Top-level commands
subparsers = args_parser.add_subparsers(
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
dest="subcommand",
required=True,
)
find_parser = subparsers.add_parser(
find_parser: argparse.ArgumentParser = subparsers.add_parser(
FIND_COMMAND,
help="Lists IDs of compressed archives.",
)
del_parser = subparsers.add_parser(
del_parser: argparse.ArgumentParser = subparsers.add_parser(
DEL_COMMAND,
help="Deletes compressed archives from the database and file system.",
)
Expand Down Expand Up @@ -97,13 +99,13 @@ def main(argv):
)

# Subcommands for delete subcommand
del_subparsers = del_parser.add_subparsers(
del_subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = del_parser.add_subparsers(
dest="del_subcommand",
required=True,
)

# Delete by ID subcommand
del_id_parser = del_subparsers.add_parser(
del_id_parser: argparse.ArgumentParser = del_subparsers.add_parser(
DEL_BY_IDS_SUBCOMMAND,
help="Deletes archives by ID.",
)
Expand All @@ -116,7 +118,7 @@ def main(argv):
)

# Delete by filter subcommand
del_filter_parser = del_subparsers.add_parser(
del_filter_parser: argparse.ArgumentParser = del_subparsers.add_parser(
DEL_BY_FILTER_SUBCOMMAND,
help="Deletes compressed archives that fall within the specified time range.",
)
Expand All @@ -135,16 +137,18 @@ def main(argv):
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)

parsed_args = args_parser.parse_args(argv[1:])
parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:])

begin_timestamp: int
end_timestamp: int
subcommand = parsed_args.subcommand
subcommand: str = parsed_args.subcommand

# Validate and load config file
try:
config_file_path = Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
config_file_path: Path = Path(parsed_args.config)
clp_config: CLPConfig = load_config_file(
config_file_path, default_config_file_path, clp_home
)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
Expand All @@ -153,7 +157,7 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
storage_type: StorageType = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
return -1
Expand All @@ -162,34 +166,38 @@ def main(argv):
if (DEL_COMMAND == subcommand and DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand) or (
FIND_COMMAND == subcommand
):
begin_timestamp = parsed_args.begin_ts
end_timestamp = parsed_args.end_ts
begin_timestamp: int = parsed_args.begin_ts
end_timestamp: int = parsed_args.end_ts

# Validate the input timestamp
if not _validate_timestamps(begin_timestamp, end_timestamp):
return -1

container_name = generate_container_name("archive-manager")
container_name: str = generate_container_name("archive-manager")

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir]
container_start_cmd = generate_container_start_cmd(
necessary_mounts: list[CLPDockerMounts] = [
mounts.clp_home,
mounts.logs_dir,
mounts.archives_output_dir,
]
container_start_cmd: list[str] = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
archive_manager_cmd = [
archive_manager_cmd: list[str] = [
"python3",
"-m", "clp_package_utils.scripts.native.archive_manager",
"--config", str(generated_config_path_on_container),
str(subcommand),
]
# fmt : on

# Add subcommand-specific arguments
if DEL_COMMAND == subcommand:
if parsed_args.dry_run:
Expand All @@ -212,7 +220,7 @@ def main(argv):
else:
logger.error(f"Unsupported subcommand: `{subcommand}`.")

cmd = container_start_cmd + archive_manager_cmd
cmd: list[str] = container_start_cmd + archive_manager_cmd

subprocess.run(cmd, check=True)

Expand Down
Loading

0 comments on commit 4591b1b

Please sign in to comment.