Skip to content

Commit

Permalink
Use Redis as Celery results backend. (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
gibber9809 authored Jan 28, 2024
1 parent d524ead commit 9e6b755
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 18 deletions.
27 changes: 27 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
CLPConfig,
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
DB_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME
)
Expand Down Expand Up @@ -207,6 +208,9 @@ def generate_credentials_file(credentials_file_path: pathlib.Path):
'user': 'clp-user',
'password': secrets.token_urlsafe(8)
},
REDIS_COMPONENT_NAME: {
'password': secrets.token_urlsafe(16)
}
}

with open(credentials_file_path, 'w') as f:
Expand Down Expand Up @@ -235,6 +239,11 @@ def validate_and_load_queue_credentials_file(clp_config: CLPConfig, clp_home: pa
validate_credentials_file_path(clp_config, clp_home, generate_default_file)
clp_config.load_queue_credentials_from_file()

def validate_and_load_redis_credentials_file(clp_config: CLPConfig, clp_home: pathlib.Path,
generate_default_file: bool):
validate_credentials_file_path(clp_config, clp_home, generate_default_file)
clp_config.load_redis_credentials_from_file()


def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
try:
Expand All @@ -259,6 +268,24 @@ def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)


def validate_redis_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path,
base_config: pathlib.Path):
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}")

if not base_config.exists():
raise ValueError(f"{REDIS_COMPONENT_NAME} base configuration at {str(base_config)} is missing.")

validate_port(f"{REDIS_COMPONENT_NAME}.port", clp_config.redis.host, clp_config.redis.port)


def validate_results_cache_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
try:
validate_path_could_be_dir(data_dir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
validate_and_load_config_file,
validate_and_load_db_credentials_file,
validate_and_load_queue_credentials_file,
validate_and_load_redis_credentials_file,
validate_db_config,
validate_queue_config,
validate_redis_config,
validate_results_cache_config,
validate_worker_config
)
from clp_py_utils.clp_config import (
CLPConfig,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SCHEDULER_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
Expand Down Expand Up @@ -245,6 +248,56 @@ def start_queue(instance_id: str, clp_config: CLPConfig):
logger.info(f"Started {QUEUE_COMPONENT_NAME}.")


def start_redis(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path):
logger.info(f"Starting {REDIS_COMPONENT_NAME}...")

container_name = f'clp-{REDIS_COMPONENT_NAME}-{instance_id}'
if container_exists(container_name):
logger.info(f"{REDIS_COMPONENT_NAME} already running.")
return

redis_logs_dir = clp_config.logs_directory / REDIS_COMPONENT_NAME
redis_data_dir = clp_config.data_directory / REDIS_COMPONENT_NAME

base_config_file_path = conf_dir / 'redis' / 'redis.conf'
validate_redis_config(clp_config, redis_data_dir, redis_logs_dir, base_config_file_path)

config_filename = f'{container_name}.conf'
host_config_file_path = clp_config.logs_directory / config_filename
with open(base_config_file_path, 'r') as base, open(host_config_file_path, 'w') as full:
for line in base.readlines():
full.write(line)
full.write(f'requirepass {clp_config.redis.password}\n')

redis_data_dir.mkdir(exist_ok=True, parents=True)
redis_logs_dir.mkdir(exist_ok=True, parents=True)

# Start container
config_file_path = pathlib.Path('/') / 'usr' / 'local' / 'etc' / 'redis' / 'redis.conf'
mounts = [
DockerMount(DockerMountType.BIND, host_config_file_path, config_file_path, True),
DockerMount(DockerMountType.BIND, redis_logs_dir, pathlib.Path('/') / 'var' / 'log' / 'redis'),
DockerMount(DockerMountType.BIND, redis_data_dir, pathlib.Path('/') / 'data'),
]
cmd = [
'docker', 'run',
'-d',
'--rm',
'--name', container_name,
'-u', f'{os.getuid()}:{os.getgid()}',
]
for mount in mounts:
cmd.append('--mount')
cmd.append(str(mount))
append_docker_port_settings_for_host_ips(clp_config.redis.host, clp_config.redis.port, 6379, cmd)
cmd.append('redis:7.2.4')
cmd.append('redis-server')
cmd.append(str(config_file_path))
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info(f"Started {REDIS_COMPONENT_NAME}.")


def start_results_cache(instance_id: str, clp_config: CLPConfig, conf_dir: pathlib.Path):
logger.info(f"Starting {RESULTS_CACHE_COMPONENT_NAME}...")

Expand Down Expand Up @@ -365,9 +418,9 @@ def start_search_scheduler(instance_id: str, clp_config: CLPConfig, container_cl
'-e', f'BROKER_URL=amqp://'
f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@'
f'{container_clp_config.queue.host}:{container_clp_config.queue.port}',
'-e', f'RESULT_BACKEND=rpc://'
f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@'
f'{container_clp_config.queue.host}:{container_clp_config.queue.port}',
'-e', f'RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@'
f'{container_clp_config.redis.host}:{container_clp_config.redis.port}/'
f'{container_clp_config.redis.search_backend_database}',
'-e', f'CLP_LOGS_DIR={container_logs_dir}',
'-e', f'CLP_LOGGING_LEVEL={clp_config.search_scheduler.logging_level}',
'-u', f'{os.getuid()}:{os.getgid()}',
Expand Down Expand Up @@ -405,14 +458,15 @@ def start_search_worker(instance_id: str, clp_config: CLPConfig, container_clp_c
container_clp_config,
celery_method,
celery_route,
clp_config.redis.search_backend_database,
num_cpus,
mounts
)


def generic_start_worker(component_name: str, instance_id: str, clp_config: CLPConfig, worker_config: BaseModel,
container_clp_config: CLPConfig, celery_method: str, celery_route: str,
num_cpus: int, mounts: CLPDockerMounts):
redis_database: int, num_cpus: int, mounts: CLPDockerMounts):
logger.info(f"Starting {component_name}...")

container_name = f'clp-{component_name}-{instance_id}'
Expand Down Expand Up @@ -441,9 +495,8 @@ def generic_start_worker(component_name: str, instance_id: str, clp_config: CLPC
'-e', f'BROKER_URL=amqp://'
f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@'
f'{container_clp_config.queue.host}:{container_clp_config.queue.port}',
'-e', f'RESULT_BACKEND=rpc://'
f'{container_clp_config.queue.username}:{container_clp_config.queue.password}@'
f'{container_clp_config.queue.host}:{container_clp_config.queue.port}',
'-e', f'RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@'
f'{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}',
'-e', f'CLP_HOME={CONTAINER_CLP_HOME}',
'-e', f'CLP_DATA_DIR={container_clp_config.data_directory}',
'-e', f'CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}',
Expand Down Expand Up @@ -555,6 +608,7 @@ def main(argv):
component_args_parser = args_parser.add_subparsers(dest='component_name')
component_args_parser.add_parser(DB_COMPONENT_NAME)
component_args_parser.add_parser(QUEUE_COMPONENT_NAME)
component_args_parser.add_parser(REDIS_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME)
worker_args_parser = component_args_parser.add_parser(WORKER_COMPONENT_NAME)
Expand Down Expand Up @@ -587,6 +641,9 @@ def main(argv):
WORKER_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME]:
validate_and_load_queue_credentials_file(clp_config, clp_home, True)
if component_name in ['', REDIS_COMPONENT_NAME, SEARCH_SCHEDULER_COMPONENT_NAME,
WORKER_COMPONENT_NAME]:
validate_and_load_redis_credentials_file(clp_config, clp_home, True)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down Expand Up @@ -625,6 +682,8 @@ def main(argv):
create_db_tables(instance_id, clp_config, container_clp_config, mounts)
if '' == component_name or QUEUE_COMPONENT_NAME == component_name:
start_queue(instance_id, clp_config)
if '' == component_name or REDIS_COMPONENT_NAME == component_name:
start_redis(instance_id, clp_config, conf_dir)
if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name:
start_results_cache(instance_id, clp_config, conf_dir)
if '' == component_name or SCHEDULER_COMPONENT_NAME == component_name:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from clp_py_utils.clp_config import (
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SEARCH_SCHEDULER_COMPONENT_NAME,
SEARCH_WORKER_COMPONENT_NAME,
Expand Down Expand Up @@ -54,6 +55,7 @@ def main(argv):
component_args_parser = args_parser.add_subparsers(dest='component_name')
component_args_parser.add_parser(DB_COMPONENT_NAME)
component_args_parser.add_parser(QUEUE_COMPONENT_NAME)
component_args_parser.add_parser(REDIS_COMPONENT_NAME)
component_args_parser.add_parser(RESULTS_CACHE_COMPONENT_NAME)
component_args_parser.add_parser(SCHEDULER_COMPONENT_NAME)
component_args_parser.add_parser(WORKER_COMPONENT_NAME)
Expand Down Expand Up @@ -107,6 +109,13 @@ def main(argv):
container_config_file_path = logs_dir / f'{container_name}.yml'
if container_config_file_path.exists():
container_config_file_path.unlink()
if '' == component_name or REDIS_COMPONENT_NAME == component_name:
container_name = f'clp-{REDIS_COMPONENT_NAME}-{instance_id}'
stop_container(container_name)

redis_config_file_path = logs_dir / f'{container_name}.conf'
if redis_config_file_path.exists():
redis_config_file_path.unlink()
if '' == component_name or RESULTS_CACHE_COMPONENT_NAME == component_name:
container_name = f'clp-{RESULTS_CACHE_COMPONENT_NAME}-{instance_id}'
stop_container(container_name)
Expand Down
26 changes: 26 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Component names
DB_COMPONENT_NAME = 'database'
QUEUE_COMPONENT_NAME = 'queue'
REDIS_COMPONENT_NAME = 'redis'
RESULTS_CACHE_COMPONENT_NAME = 'results_cache'
SCHEDULER_COMPONENT_NAME = 'scheduler'
SEARCH_SCHEDULER_COMPONENT_NAME = 'search_scheduler'
Expand Down Expand Up @@ -131,6 +132,20 @@ def validate_logging_level(cls, field):
return field


class Redis(BaseModel):
host: str = 'localhost'
port: int = 6379
search_backend_database: int = 0
# redis can perform authentication without a username
password: typing.Optional[str]

@validator('host')
def validate_host(cls, field):
if '' == field:
raise ValueError(f'{REDIS_COMPONENT_NAME}.host cannot be empty.')
return field


class ResultsCache(BaseModel):
host: str = 'localhost'
port: int = 27017
Expand Down Expand Up @@ -202,6 +217,7 @@ class CLPConfig(BaseModel):

database: Database = Database()
queue: Queue = Queue()
redis: Redis = Redis()
results_cache: ResultsCache = ResultsCache()
scheduler: Scheduler = Scheduler()
search_scheduler: SearchScheduler = SearchScheduler()
Expand Down Expand Up @@ -265,6 +281,16 @@ def load_queue_credentials_from_file(self):
self.queue.password = get_config_value(config, f"{QUEUE_COMPONENT_NAME}.password")
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")

def load_redis_credentials_from_file(self):
config = read_yaml_config_file(self.credentials_file_path)
if config is None:
raise ValueError(f"Credentials file '{self.credentials_file_path}' is empty.")
try:
self.redis.password = get_config_value(config, f"{REDIS_COMPONENT_NAME}.password")
except KeyError as ex:
raise ValueError(f"Credentials file '{self.credentials_file_path}' does not contain key '{ex}'.")


def dump_to_primitive_dict(self):
d = self.dict()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,10 @@ def handle_pending_search_jobs(db_conn, results_cache_uri: str) -> None:


def try_getting_task_result(async_task_result):
"""
Ideally, we'd use this code:
if not async_task_result.ready():
return None
return async_task_result.get()

But because of https://github.com/celery/celery/issues/4084, wew have to use the following
timeout based approach until we switch to the Redis result backend.
"""
try:
return async_task_result.get(timeout=0.1)
except celery.exceptions.TimeoutError:
return None


def check_job_status_and_update_db(db_conn):
global active_jobs
Expand Down
1 change: 1 addition & 0 deletions components/job-orchestration/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mysql-connector-python = "^8.2.0"
pika = "^1.3.2"
pydantic = "^1.10.13"
PyYAML = "^6.0.1"
redis = "^5.0.1"
zstandard = "~0.22"

[build-system]
Expand Down
5 changes: 5 additions & 0 deletions components/package-template/src/etc/clp-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
# host: "localhost"
# port: 5672
#
#redis:
# host: "localhost"
# port: 6379
# search_backend_database: 0
#
#results_cache:
# host: "localhost"
# port: 27017
Expand Down
4 changes: 4 additions & 0 deletions components/package-template/src/etc/credentials.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@
#queue:
# user: "user"
# password: "pass"
#
## Redis credentials
#redis:
# password: "pass"
5 changes: 5 additions & 0 deletions components/package-template/src/etc/redis/redis.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# For full reference of configuration options see
# https://raw.githubusercontent.com/redis/redis/7.2/redis.conf

databases 1
logfile "/var/log/redis/redis.log"

0 comments on commit 9e6b755

Please sign in to comment.