Skip to content

Commit

Permalink
Reduce database accesses
Browse files Browse the repository at this point in the history
No need to check secret variables for every operator, making the
mount_variables variable global to cache it
  • Loading branch information
ranlu committed Mar 12, 2024
1 parent 01ca6a0 commit 28097a9
Showing 1 changed file with 13 additions and 29 deletions.
42 changes: 13 additions & 29 deletions dags/synaptor_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@

airflow_broker_url = conf.get("celery", "broker_url")

maybe_aws = Variable.get("aws-secret.json", None)
maybe_gcp = Variable.get("google-secret.json", None)

mount_variables = ["synaptor_param.json"]

if maybe_aws is not None:
mount_variables.append("aws-secret.json")
if maybe_gcp is not None:
mount_variables.append("google-secret.json")

# hard-coding these for now
MOUNT_POINT = "/root/.cloudvolume/secrets/"
TASK_QUEUE_NAME = "synaptor"
Expand Down Expand Up @@ -204,11 +214,8 @@ def manager_op(
config_path = os.path.join(MOUNT_POINT, "synaptor_param.json")
command = f"{synaptor_task_name} {config_path}"

# these variables will be mounted in the containers
variables = ["synaptor_param.json"]

return worker_op(
variables=variables,
variables=mount_variables,
mount_point=MOUNT_POINT,
task_id=synaptor_task_name,
command=command,
Expand Down Expand Up @@ -243,12 +250,10 @@ def generate_op(
command += f" --clusterkey {tag}"

# these variables will be mounted in the containers
variables = add_secrets_if_defined(["synaptor_param.json"])

task_id = f"generate_{taskname}" if tag is None else f"generate_{taskname}_{tag}"

return worker_op(
variables=variables,
variables=mount_variables,
mount_point=MOUNT_POINT,
task_id=task_id,
command=command,
Expand Down Expand Up @@ -282,13 +287,10 @@ def synaptor_op(
" --lease_seconds 300"
)

# these variables will be mounted in the containers
variables = add_secrets_if_defined(["synaptor_param.json"])

task_id = f"worker_{i}" if tag is None else f"worker_{tag}_{i}"

return worker_op(
variables=variables,
variables=mount_variables,
mount_point=MOUNT_POINT,
task_id=task_id,
command=command,
Expand Down Expand Up @@ -321,21 +323,3 @@ def wait_op(dag: DAG, taskname: str) -> PythonOperator:
queue="manager",
dag=dag,
)


# Helper functions
def add_secrets_if_defined(variables: list[str]) -> list[str]:
"""Adds CloudVolume secret files to the mounted variables if defined.
Synaptor still needs to store the google-secret.json file sometimes
bc it currently uses an old version of gsutil.
"""
maybe_aws = Variable.get("aws-secret.json", None)
maybe_gcp = Variable.get("google-secret.json", None)

if maybe_aws is not None:
variables.append("aws-secret.json")
if maybe_gcp is not None:
variables.append("google-secret.json")

return variables

0 comments on commit 28097a9

Please sign in to comment.