diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 9f67b84c54..38222b18c6 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -154,6 +154,12 @@ class scheduler(Config): metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default) + stable_done_cooldown_secs = parameter.IntParameter(default=10, + description="Sets cooldown period to avoid running the same task twice") + """ + Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler. + """ + def _get_retry_policy(self): return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window) @@ -836,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, if task is None or (task.status != RUNNING and not worker.enabled): return + # Ignore claims that the task is PENDING if it very recently was marked as DONE. + if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs: + return + # for setting priority, we'll sometimes create tasks with unset family and params if not task.family: task.family = family diff --git a/test/scheduler_api_test.py b/test/scheduler_api_test.py index 73d5d5b547..500dc0ff95 100644 --- a/test/scheduler_api_test.py +++ b/test/scheduler_api_test.py @@ -46,6 +46,7 @@ def get_scheduler_config(self): 'disable_window': 10, 'retry_count': 3, 'disable_hard_timeout': 60 * 60, + 'stable_done_cooldown_secs': 0 } def tearDown(self): diff --git a/test/scheduler_test.py b/test/scheduler_test.py index 95cce5db71..fdd4e2547b 100644 --- a/test/scheduler_test.py +++ b/test/scheduler_test.py @@ -19,10 +19,15 @@ import pickle import tempfile import time +import os +import shutil +from multiprocessing import Process from helpers import unittest import luigi.scheduler +import luigi.server from helpers import with_config +from luigi.target import FileAlreadyExists class SchedulerIoTest(unittest.TestCase): @@ -281,3 +286,68 @@ def test_get_pending_tasks_with_many_done_tasks(self): non_trivial_worker = scheduler_state.get_worker('NON_TRIVIAL') self.assertEqual({'A'}, self.get_pending_ids(non_trivial_worker, scheduler_state)) + + +class FailingOnDoubleRunTask(luigi.Task): + time_to_check_secs = 1 + time_to_run_secs = 2 + output_dir = luigi.Parameter(default="") + + def __init__(self, *args, **kwargs): + super(FailingOnDoubleRunTask, self).__init__(*args, **kwargs) + self.file_name = os.path.join(self.output_dir, "AnyTask") + + def complete(self): + time.sleep(self.time_to_check_secs) # e.g., establish connection + exists = os.path.exists(self.file_name) + time.sleep(self.time_to_check_secs) # e.g., close connection + return exists + + def run(self): + time.sleep(self.time_to_run_secs) + if os.path.exists(self.file_name): + raise FileAlreadyExists(self.file_name) + open(self.file_name, 'w').close() + + +class StableDoneCooldownSecsTest(unittest.TestCase): + + def setUp(self): + self.p = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.p) + + def run_task(self): + return luigi.build([FailingOnDoubleRunTask(output_dir=self.p)], + detailed_summary=True, + parallel_scheduling=True, + parallel_scheduling_processes=2) + + @with_config({'worker': {'keep_alive': 'false'}}) + def get_second_run_result_on_double_run(self): + server_process = Process(target=luigi.server.run) + process = Process(target=self.run_task) + try: + # scheduler is started + server_process.start() + # first run is started + process.start() + time.sleep(FailingOnDoubleRunTask.time_to_run_secs + FailingOnDoubleRunTask.time_to_check_secs) + # second run of the same task is started + second_run_result = self.run_task() + return second_run_result + finally: + process.join(1) + server_process.terminate() + server_process.join(1) + + @with_config({'scheduler': {'stable_done_cooldown_secs': '5'}}) + def test_sending_same_task_twice_with_cooldown_does_not_lead_to_double_run(self): + second_run_result = self.get_second_run_result_on_double_run() + self.assertEqual(second_run_result.scheduling_succeeded, True) + + @with_config({'scheduler': {'stable_done_cooldown_secs': '0'}}) + def test_sending_same_task_twice_without_cooldown_leads_to_double_run(self): + second_run_result = self.get_second_run_result_on_double_run() + self.assertEqual(second_run_result.scheduling_succeeded, False) diff --git a/test/task_running_resources_test.py b/test/task_running_resources_test.py index 53273d3f50..ece733fa1c 100644 --- a/test/task_running_resources_test.py +++ b/test/task_running_resources_test.py @@ -21,7 +21,7 @@ import multiprocessing from contextlib import contextmanager -from helpers import unittest, RunOnceTask +from helpers import unittest, RunOnceTask, with_config import luigi import luigi.server @@ -73,6 +73,7 @@ def test_resource_reduction(self): class ConcurrentRunningResourcesTest(unittest.TestCase): + @with_config({'scheduler': {'stable_done_cooldown_secs': '0'}}) def setUp(self): super(ConcurrentRunningResourcesTest, self).setUp() diff --git a/test/worker_test.py b/test/worker_test.py index 67d45b6a1a..6dcc2269e9 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -123,7 +123,7 @@ def run(self): class WorkerTest(LuigiTestCase): def run(self, result=None): - self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10, stable_done_cooldown_secs=0) self.time = time.time with Worker(scheduler=self.sch, worker_id='X') as w, Worker(scheduler=self.sch, worker_id='Y') as w2: self.w = w