From e69402280a672fb286a97e9ba2adc7520d1da32a Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Thu, 27 Oct 2022 15:17:28 +0200 Subject: [PATCH] fix workflow bug that to_start_works not released --- main/lib/idds/tests/test_domapanda.py | 3 +++ workflow/lib/idds/workflowv2/workflow.py | 13 +++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/main/lib/idds/tests/test_domapanda.py b/main/lib/idds/tests/test_domapanda.py index 468fe02d..5731dc3c 100644 --- a/main/lib/idds/tests/test_domapanda.py +++ b/main/lib/idds/tests/test_domapanda.py @@ -128,6 +128,7 @@ def setup_workflow(): log_collections=[], dependency_map=taskN1.dependencies, task_name=taskN1.name, task_queue=task_queue, encode_command_line=True, + prodSourceLabel='managed', task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -141,6 +142,7 @@ def setup_workflow(): log_collections=[], dependency_map=taskN2.dependencies, task_name=taskN2.name, task_queue=task_queue, encode_command_line=True, + prodSourceLabel='managed', task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", @@ -154,6 +156,7 @@ def setup_workflow(): log_collections=[], dependency_map=taskN3.dependencies, task_name=taskN3.name, task_queue=task_queue, encode_command_line=True, + prodSourceLabel='managed', task_log={"dataset": "PandaJob_#{pandaid}/", "destination": "local", "param_type": "log", diff --git a/workflow/lib/idds/workflowv2/workflow.py b/workflow/lib/idds/workflowv2/workflow.py index 057e06bd..3472dbf5 100644 --- a/workflow/lib/idds/workflowv2/workflow.py +++ b/workflow/lib/idds/workflowv2/workflow.py @@ -1443,19 +1443,24 @@ def get_new_works(self, synchronize=True): return works if self.to_start_works: + self.logger.info("%s to_start_works: %s" % (self.get_internal_id(), str(self.to_start_works))) to_start_works = self.to_start_works.copy() init_works = self.init_works starting_works = [] for work_id in to_start_works: if not self.works[work_id].has_dependency(): starting_works.append(work_id) - self.get_new_work_to_run(work_id) - if not init_works: - init_works.append(work_id) - self.init_works = init_works + if not starting_works: + work_id = to_start_works.pop(0) + starting_works.append(work_id) for work_id in starting_works: + self.get_new_work_to_run(work_id) + if not init_works: + init_works.append(work_id) + self.init_works = init_works if work_id in self.to_start_works: self.to_start_works.remove(work_id) + self.logger.info("%s starting_works: %s" % (self.get_internal_id(), str(starting_works))) for k in self.new_to_run_works: if isinstance(self.works[k], Work):