From 28097a9c4c70f647f62ee715e8d68a083b6c0346 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Sun, 10 Mar 2024 18:40:24 -0400 Subject: [PATCH] Reduce database accesses No need to check secret variables for every operator, making the mount_variables variable global to cache it --- dags/synaptor_ops.py | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/dags/synaptor_ops.py b/dags/synaptor_ops.py index 6374e592..4c56513d 100644 --- a/dags/synaptor_ops.py +++ b/dags/synaptor_ops.py @@ -19,6 +19,16 @@ airflow_broker_url = conf.get("celery", "broker_url") +maybe_aws = Variable.get("aws-secret.json", None) +maybe_gcp = Variable.get("google-secret.json", None) + +mount_variables = ["synaptor_param.json"] + +if maybe_aws is not None: + mount_variables.append("aws-secret.json") +if maybe_gcp is not None: + mount_variables.append("google-secret.json") + # hard-coding these for now MOUNT_POINT = "/root/.cloudvolume/secrets/" TASK_QUEUE_NAME = "synaptor" @@ -204,11 +214,8 @@ def manager_op( config_path = os.path.join(MOUNT_POINT, "synaptor_param.json") command = f"{synaptor_task_name} {config_path}" - # these variables will be mounted in the containers - variables = ["synaptor_param.json"] - return worker_op( - variables=variables, + variables=mount_variables, mount_point=MOUNT_POINT, task_id=synaptor_task_name, command=command, @@ -243,12 +250,10 @@ def generate_op( command += f" --clusterkey {tag}" # these variables will be mounted in the containers - variables = add_secrets_if_defined(["synaptor_param.json"]) - task_id = f"generate_{taskname}" if tag is None else f"generate_{taskname}_{tag}" return worker_op( - variables=variables, + variables=mount_variables, mount_point=MOUNT_POINT, task_id=task_id, command=command, @@ -282,13 +287,10 @@ def synaptor_op( " --lease_seconds 300" ) - # these variables will be mounted in the containers - variables = add_secrets_if_defined(["synaptor_param.json"]) - task_id = f"worker_{i}" if tag is None else f"worker_{tag}_{i}" return worker_op( - variables=variables, + variables=mount_variables, mount_point=MOUNT_POINT, task_id=task_id, command=command, @@ -321,21 +323,3 @@ def wait_op(dag: DAG, taskname: str) -> PythonOperator: queue="manager", dag=dag, ) - - -# Helper functions -def add_secrets_if_defined(variables: list[str]) -> list[str]: - """Adds CloudVolume secret files to the mounted variables if defined. - - Synaptor still needs to store the google-secret.json file sometimes - bc it currently uses an old version of gsutil. - """ - maybe_aws = Variable.get("aws-secret.json", None) - maybe_gcp = Variable.get("google-secret.json", None) - - if maybe_aws is not None: - variables.append("aws-secret.json") - if maybe_gcp is not None: - variables.append("google-secret.json") - - return variables