From 35933bbd3ba6bb648f5c4047c23e86830a82c58b Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 8 Jun 2022 17:52:26 +0200 Subject: [PATCH 1/3] Fix poll rate bug --- reframe/frontend/executors/policies.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 3c7ef313f1..1644b3338e 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -50,21 +50,17 @@ def __init__(self): self._sleep_duration = None self._t_init = None - def running_tasks(self, num_tasks): - if self._sleep_duration is None: + def set_sleep_duration(self, reset=False): + if self._sleep_duration is None or reset: self._sleep_duration = self.SLEEP_MIN + else: + self._sleep_duration = min( + self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX + ) if self._num_polls == 0: self._t_init = time.time() - else: - if self._num_tasks != num_tasks: - self._sleep_duration = self.SLEEP_MIN - else: - self._sleep_duration = min( - self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX - ) - self._num_tasks = num_tasks return self def snooze(self): @@ -120,7 +116,6 @@ def runcase(self, case): task.skip() raise TaskExit from e - partname = task.testcase.partition.fullname task.setup(task.testcase.partition, task.testcase.environ, sched_flex_alloc_nodes=self.sched_flex_alloc_nodes, @@ -136,12 +131,13 @@ def runcase(self, case): else: sched = partition.scheduler + self._pollctl.set_sleep_duration(reset=True) while True: sched.poll(task.check.job) if task.run_complete(): break - self._pollctl.running_tasks(1).snooze() + self._pollctl.set_sleep_duration() task.run_wait() if not self.skip_sanity_check: @@ -349,7 +345,7 @@ def exit(self): ) if num_running: - self._pollctl.running_tasks(num_running).snooze() + self._pollctl.set_sleep_duration().snooze() except ABORT_REASONS as e: self._failall(e) raise @@ -565,10 +561,10 @@ def on_task_compile(self, task): pass def on_task_exit(self, task): - pass + self._pollctl.set_sleep_duration(reset=True) def on_task_compile_exit(self, task): - pass + self._pollctl.set_sleep_duration(reset=True) def on_task_skip(self, task): msg = str(task.exc_info[1]) From a6efce627c56cc29b9e124bd364f0650ae4f51a4 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 9 Jun 2022 15:41:36 +0200 Subject: [PATCH 2/3] Address PR comments --- reframe/frontend/executors/policies.py | 28 ++++++++++---------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 1644b3338e..b7c0938ce8 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -46,23 +46,14 @@ class _PollController: def __init__(self): self._num_polls = 0 - self._num_tasks = 0 self._sleep_duration = None self._t_init = None - def set_sleep_duration(self, reset=False): - if self._sleep_duration is None or reset: - self._sleep_duration = self.SLEEP_MIN - else: - self._sleep_duration = min( - self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX - ) - + def reset_snooze_time(self): + self._sleep_duration = self.SLEEP_MIN if self._num_polls == 0: self._t_init = time.time() - return self - def snooze(self): t_elapsed = time.time() - self._t_init self._num_polls += 1 @@ -72,6 +63,9 @@ def snooze(self): f'(current poll rate: {poll_rate} polls/s)' ) time.sleep(self._sleep_duration) + self._sleep_duration = min( + self._sleep_duration*self.SLEEP_INC_RATE, self.SLEEP_MAX + ) class SerialExecutionPolicy(ExecutionPolicy, TaskEventListener): @@ -131,14 +125,12 @@ def runcase(self, case): else: sched = partition.scheduler - self._pollctl.set_sleep_duration(reset=True) + self._pollctl.reset_snooze_time() while True: sched.poll(task.check.job) if task.run_complete(): break - self._pollctl.set_sleep_duration() - task.run_wait() if not self.skip_sanity_check: task.sanity() @@ -317,6 +309,8 @@ def exit(self): if self._pipeline_statistics: self._init_pipeline_progress(len(self._current_tasks)) + self._pollctl.reset_snooze_time() + while self._current_tasks: try: self._poll_tasks() @@ -345,7 +339,7 @@ def exit(self): ) if num_running: - self._pollctl.set_sleep_duration().snooze() + self._pollctl.snooze() except ABORT_REASONS as e: self._failall(e) raise @@ -561,10 +555,10 @@ def on_task_compile(self, task): pass def on_task_exit(self, task): - self._pollctl.set_sleep_duration(reset=True) + self._pollctl.reset_snooze_time() def on_task_compile_exit(self, task): - self._pollctl.set_sleep_duration(reset=True) + self._pollctl.reset_snooze_time() def on_task_skip(self, task): msg = str(task.exc_info[1]) From 1971fdf95069888fb323cf4f376aa1690347ad2a Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 9 Jun 2022 16:36:21 +0200 Subject: [PATCH 3/3] Address PR comments --- reframe/frontend/executors/policies.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index b7c0938ce8..03514734ed 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -51,10 +51,11 @@ def __init__(self): def reset_snooze_time(self): self._sleep_duration = self.SLEEP_MIN + + def snooze(self): if self._num_polls == 0: self._t_init = time.time() - def snooze(self): t_elapsed = time.time() - self._t_init self._num_polls += 1 poll_rate = self._num_polls / t_elapsed if t_elapsed else math.inf @@ -310,7 +311,6 @@ def exit(self): self._init_pipeline_progress(len(self._current_tasks)) self._pollctl.reset_snooze_time() - while self._current_tasks: try: self._poll_tasks()