From 9e6b75581421ddcd3473d6886fd4acaf23698603 Mon Sep 17 00:00:00 2001 From: Devin Kenneth Gibson Date: Sat, 27 Jan 2024 21:06:10 -0500 Subject: [PATCH] Use Redis as Celery results backend. (#227) --- .../clp_package_utils/general.py | 27 +++++++ .../clp_package_utils/scripts/start_clp.py | 73 +++++++++++++++++-- .../clp_package_utils/scripts/stop_clp.py | 9 +++ .../clp-py-utils/clp_py_utils/clp_config.py | 26 +++++++ .../search_scheduler/search_scheduler.py | 11 --- components/job-orchestration/pyproject.toml | 1 + .../package-template/src/etc/clp-config.yml | 5 ++ .../src/etc/credentials.template.yml | 4 + .../package-template/src/etc/redis/redis.conf | 5 ++ 9 files changed, 143 insertions(+), 18 deletions(-) create mode 100644 components/package-template/src/etc/redis/redis.conf diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index cbb95b765..36ddfc68e 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -13,6 +13,7 @@ CLPConfig, CLP_DEFAULT_CREDENTIALS_FILE_PATH, DB_COMPONENT_NAME, + REDIS_COMPONENT_NAME, QUEUE_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME ) @@ -207,6 +208,9 @@ def generate_credentials_file(credentials_file_path: pathlib.Path): 'user': 'clp-user', 'password': secrets.token_urlsafe(8) }, + REDIS_COMPONENT_NAME: { + 'password': secrets.token_urlsafe(16) + } } with open(credentials_file_path, 'w') as f: @@ -235,6 +239,11 @@ def validate_and_load_queue_credentials_file(clp_config: CLPConfig, clp_home: pa validate_credentials_file_path(clp_config, clp_home, generate_default_file) clp_config.load_queue_credentials_from_file() +def validate_and_load_redis_credentials_file(clp_config: CLPConfig, clp_home: pathlib.Path, + generate_default_file: bool): + validate_credentials_file_path(clp_config, clp_home, generate_default_file) + clp_config.load_redis_credentials_from_file() + def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path): try: @@ -259,6 +268,24 @@ def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path): validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port) +def validate_redis_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, + base_config: pathlib.Path): + try: + validate_path_could_be_dir(data_dir) + except ValueError as ex: + raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}") + + try: + validate_path_could_be_dir(logs_dir) + except ValueError as ex: + raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}") + + if not base_config.exists(): + raise ValueError(f"{REDIS_COMPONENT_NAME} base configuration at {str(base_config)} is missing.") + + validate_port(f"{REDIS_COMPONENT_NAME}.port", clp_config.redis.host, clp_config.redis.port) + + def validate_results_cache_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path): try: validate_path_could_be_dir(data_dir) diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index e2c41b06d..19af9a118 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -26,8 +26,10 @@ validate_and_load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file, + validate_and_load_redis_credentials_file, validate_db_config, validate_queue_config, + validate_redis_config, validate_results_cache_config, validate_worker_config ) @@ -35,6 +37,7 @@ CLPConfig, DB_COMPONENT_NAME, QUEUE_COMPONENT_NAME, + REDIS_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, SCHEDULER_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, @@ -245,6 +248,56 @@ def start_queue(instance_id: str, clp_config: CLPConfig): logger.info(f"Started {QUEUE_COMPONENT_NAME}.") +def start_redis(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): + logger.info(f"Starting {REDIS_COMPONENT_NAME}...") + + container_name = f'clp-{REDIS_COMPONENT_NAME}-{instance_id}' + if container_exists(container_name): + logger.info(f"{REDIS_COMPONENT_NAME} already running.") + return + + redis_logs_dir = clp_config.logs_directory / REDIS_COMPONENT_NAME + redis_data_dir = clp_config.data_directory / REDIS_COMPONENT_NAME + + base_config_file_path = conf_dir / 'redis' / 'redis.conf' + validate_redis_config(clp_config, redis_data_dir, redis_logs_dir, base_config_file_path) + + config_filename = f'{container_name}.conf' + host_config_file_path = clp_config.logs_directory / config_filename + with open(base_config_file_path, 'r') as base, open(host_config_file_path, 'w') as full: + for line in base.readlines(): + full.write(line) + full.write(f'requirepass {clp_config.redis.password}\n') + + redis_data_dir.mkdir(exist_ok=True, parents=True) + redis_logs_dir.mkdir(exist_ok=True, parents=True) + + # Start container + config_file_path = pathlib.Path('/') / 'usr' / 'local' / 'etc' / 'redis' / 'redis.conf' + mounts = [ + DockerMount(DockerMountType.BIND, host_config_file_path, config_file_path, True), + DockerMount(DockerMountType.BIND, redis_logs_dir, pathlib.Path('/') / 'var' / 'log' / 'redis'), + DockerMount(DockerMountType.BIND, redis_data_dir, pathlib.Path('/') / 'data'), + ] + cmd = [ + 'docker', 'run', + '-d', + '--rm', + '--name', container_name, + '-u', f'{os.getuid()}:{os.getgid()}', + ] + for mount in mounts: + cmd.append('--mount') + cmd.append(str(mount)) + append_docker_port_settings_for_host_ips(clp_config.redis.host, clp_config.redis.port, 6379, cmd) + cmd.append('redis:7.2.4') + cmd.append('redis-server') + cmd.append(str(config_file_path)) + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {REDIS_COMPONENT_NAME}.") + + def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path): logger.info(f"Starting {RESULTS_CACHE_COMPONENT_NAME}...") @@ -365,9 +418,9 @@ def start_search_scheduler(instance_id: str, clp_config: CLPConfig, container_cl '-e', f'BROKER_URL=amqp://' f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', - '-e', f'RESULT_BACKEND=rpc://' - f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' - f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@' + f'{container_clp_config.redis.host}:{container_clp_config.redis.port}/' + f'{container_clp_config.redis.search_backend_database}', '-e', f'CLP_LOGS_DIR={container_logs_dir}', '-e', f'CLP_LOGGING_LEVEL={clp_config.search_scheduler.logging_level}', '-u', f'{os.getuid()}:{os.getgid()}', @@ -405,6 +458,7 @@ def start_search_worker(instance_id: str, clp_config: CLPConfig, container_clp_c container_clp_config, celery_method, celery_route, + clp_config.redis.search_backend_database, num_cpus, mounts ) @@ -412,7 +466,7 @@ def start_search_worker(instance_id: str, clp_config: CLPConfig, container_clp_c def generic_start_worker(component_name: str, instance_id: str, clp_config: CLPConfig, worker_config: BaseModel, container_clp_config: CLPConfig, celery_method: str, celery_route: str, - num_cpus: int, mounts: CLPDockerMounts): + redis_database: int, num_cpus: int, mounts: CLPDockerMounts): logger.info(f"Starting {component_name}...") container_name = f'clp-{component_name}-{instance_id}' @@ -441,9 +495,8 @@ def generic_start_worker(component_name: str, instance_id: str, clp_config: CLPC '-e', f'BROKER_URL=amqp://' f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', - '-e', f'RESULT_BACKEND=rpc://' - f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@' - f'{container_clp_config.queue.host}:{container_clp_config.queue.port}', + '-e', f'RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@' + f'{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}', '-e', f'CLP_HOME={CONTAINER_CLP_HOME}', '-e', f'CLP_DATA_DIR={container_clp_config.data_directory}', '-e', f'CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}', @@ -555,6 +608,7 @@ def main(argv): component_args_parser = args_parser.add_subparsers(dest='component_name') component_args_parser.add_parser(DB_COMPONENT_NAME) component_args_parser.add_parser(QUEUE_COMPONENT_NAME) + component_args_parser.add_parser(REDIS_COMPONENT_NAME) component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME) worker_args_parser = component_args_parser.add_parser(WORKER_COMPONENT_NAME) @@ -587,6 +641,9 @@ def main(argv): WORKER_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, SEARCH_WORKER_COMPONENT_NAME]: validate_and_load_queue_credentials_file(clp_config, clp_home, True) + if component_name in ['', REDIS_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, + WORKER_COMPONENT_NAME]: + validate_and_load_redis_credentials_file(clp_config, clp_home, True) clp_config.validate_data_dir() clp_config.validate_logs_dir() @@ -625,6 +682,8 @@ def main(argv): create_db_tables(instance_id, clp_config, container_clp_config, mounts) if '' == component_name or QUEUE_COMPONENT_NAME == component_name: start_queue(instance_id, clp_config) + if '' == component_name or REDIS_COMPONENT_NAME == component_name: + start_redis(instance_id, clp_config, conf_dir) if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name: start_results_cache(instance_id, clp_config, conf_dir) if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index 44186bc0e..5a23ac53d 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -15,6 +15,7 @@ from clp_py_utils.clp_config import ( DB_COMPONENT_NAME, QUEUE_COMPONENT_NAME, + REDIS_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME, SEARCH_WORKER_COMPONENT_NAME, @@ -54,6 +55,7 @@ def main(argv): component_args_parser = args_parser.add_subparsers(dest='component_name') component_args_parser.add_parser(DB_COMPONENT_NAME) component_args_parser.add_parser(QUEUE_COMPONENT_NAME) + component_args_parser.add_parser(REDIS_COMPONENT_NAME) component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME) component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME) component_args_parser.add_parser(WORKER_COMPONENT_NAME) @@ -107,6 +109,13 @@ def main(argv): container_config_file_path = logs_dir / f'{container_name}.yml' if container_config_file_path.exists(): container_config_file_path.unlink() + if '' == component_name or REDIS_COMPONENT_NAME == component_name: + container_name = f'clp-{REDIS_COMPONENT_NAME}-{instance_id}' + stop_container(container_name) + + redis_config_file_path = logs_dir / f'{container_name}.conf' + if redis_config_file_path.exists(): + redis_config_file_path.unlink() if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name: container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}' stop_container(container_name) diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 7ccb56da7..70b0b8c3c 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -10,6 +10,7 @@ # Component names DB_COMPONENT_NAME = 'database' QUEUE_COMPONENT_NAME = 'queue' +REDIS_COMPONENT_NAME = 'redis' RESULTS_CACHE_COMPONENT_NAME = 'results_cache' SCHEDULER_COMPONENT_NAME = 'scheduler' SEARCH_SCHEDULER_COMPONENT_NAME = 'search_scheduler' @@ -131,6 +132,20 @@ def validate_logging_level(cls, field): return field +class Redis(BaseModel): + host: str = 'localhost' + port: int = 6379 + search_backend_database: int = 0 + # redis can perform authentication without a username + password: typing.Optional[str] + + @validator('host') + def validate_host(cls, field): + if '' == field: + raise ValueError(f'{REDIS_COMPONENT_NAME}.host cannot be empty.') + return field + + class ResultsCache(BaseModel): host: str = 'localhost' port: int = 27017 @@ -202,6 +217,7 @@ class CLPConfig(BaseModel): database: Database = Database() queue: Queue = Queue() + redis: Redis = Redis() results_cache: ResultsCache = ResultsCache() scheduler: Scheduler = Scheduler() search_scheduler: SearchScheduler = SearchScheduler() @@ -265,6 +281,16 @@ def load_queue_credentials_from_file(self): self.queue.password = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.password") except KeyError as ex: raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.") + + def load_redis_credentials_from_file(self): + config = read_yaml_config_file(self.credentials_file_path) + if config is None: + raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.") + try: + self.redis.password = get_config_value(config, f"{REDIS_COMPONENT_NAME}.password") + except KeyError as ex: + raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.") + def dump_to_primitive_dict(self): d = self.dict() diff --git a/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py index 8217b20b8..54790b8dc 100644 --- a/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py +++ b/components/job-orchestration/job_orchestration/search_scheduler/search_scheduler.py @@ -176,21 +176,10 @@ def handle_pending_search_jobs(db_conn, results_cache_uri: str) -> None: def try_getting_task_result(async_task_result): - """ - Ideally, we'd use this code: - if not async_task_result.ready(): return None return async_task_result.get() - But because of https://github.com/celery/celery/issues/4084, wew have to use the following - timeout based approach until we switch to the Redis result backend. - """ - try: - return async_task_result.get(timeout=0.1) - except celery.exceptions.TimeoutError: - return None - def check_job_status_and_update_db(db_conn): global active_jobs diff --git a/components/job-orchestration/pyproject.toml b/components/job-orchestration/pyproject.toml index b99bb0e22..c660593a0 100644 --- a/components/job-orchestration/pyproject.toml +++ b/components/job-orchestration/pyproject.toml @@ -16,6 +16,7 @@ mysql-connector-python = "^8.2.0" pika = "^1.3.2" pydantic = "^1.10.13" PyYAML = "^6.0.1" +redis = "^5.0.1" zstandard = "~0.22" [build-system] diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 291752f53..9eacaf818 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -20,6 +20,11 @@ # host: "localhost" # port: 5672 # +#redis: +# host: "localhost" +# port: 6379 +# search_backend_database: 0 +# #results_cache: # host: "localhost" # port: 27017 diff --git a/components/package-template/src/etc/credentials.template.yml b/components/package-template/src/etc/credentials.template.yml index 54bd78321..9a0dd73fa 100644 --- a/components/package-template/src/etc/credentials.template.yml +++ b/components/package-template/src/etc/credentials.template.yml @@ -7,3 +7,7 @@ #queue: # user: "user" # password: "pass" +# +## Redis credentials +#redis: +# password: "pass" diff --git a/components/package-template/src/etc/redis/redis.conf b/components/package-template/src/etc/redis/redis.conf new file mode 100644 index 000000000..7603ab3f4 --- /dev/null +++ b/components/package-template/src/etc/redis/redis.conf @@ -0,0 +1,5 @@ +# For full reference of configuration options see +# https://raw.githubusercontent.com/redis/redis/7.2/redis.conf + +databases 1 +logfile "/var/log/redis/redis.log"