From 79e3d81cebc09f12347662925305cee01f8366d0 Mon Sep 17 00:00:00 2001 From: Ran Lu Date: Tue, 26 Mar 2024 18:13:18 -0400 Subject: [PATCH] Specify the workercount for self_destruct task directly --- dags/synaptor_dags.py | 10 +++++++++- dags/synaptor_ops.py | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/dags/synaptor_dags.py b/dags/synaptor_dags.py index 747db720..3c09bad0 100644 --- a/dags/synaptor_dags.py +++ b/dags/synaptor_dags.py @@ -215,7 +215,15 @@ def add_task( dag: DAG, task: Task, prev_operator: BaseOperator, tag: Optional[str] = None ) -> BaseOperator: """Adds a processing step to a DAG.""" - generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, tag=tag) + + if task.name == "self_destruct": + cluster_size = 1 if tag.startswith("synaptor-seggraph") else MAX_CLUSTER_SIZE + extra_args = {"workercount": str(cluster_size)} + else: + extra_args = None + + generate = generate_op(dag, task.name, image=SYNAPTOR_IMAGE, extra_args=extra_args, tag=tag) + if tag: wait = wait_op(dag, f"{task.name}_{tag}") else: diff --git a/dags/synaptor_ops.py b/dags/synaptor_ops.py index 4c56513d..414ff971 100644 --- a/dags/synaptor_ops.py +++ b/dags/synaptor_ops.py @@ -235,6 +235,7 @@ def generate_op( taskname: str, op_queue_name: Optional[str] = "manager", task_queue_name: Optional[str] = TASK_QUEUE_NAME, + extra_args: Optional[dict] = None, tag: Optional[str] = None, image: str = default_synaptor_image, ) -> BaseOperator: @@ -246,8 +247,10 @@ def generate_op( f" --queueurl {airflow_broker_url}" f" --queuename {task_queue_name}" ) - if taskname == "self_destruct": - command += f" --clusterkey {tag}" + + if extra_args: + for k in extra_args: + command += f" --{k} {extra_args[k]}" # these variables will be mounted in the containers task_id = f"generate_{taskname}" if tag is None else f"generate_{taskname}_{tag}"