Skip to content

Commit

Permalink
Issue 2644: Tasks can be run several times
Browse files Browse the repository at this point in the history
- introduced parameter `stable_done_cooldown_secs` with description
- added an acceptance test
  • Loading branch information
Konstantin Gudkov committed Apr 24, 2019
1 parent 826bcd6 commit 8ab20a9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 2 deletions.
10 changes: 10 additions & 0 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ class scheduler(Config):

metrics_collector = parameter.EnumParameter(enum=MetricsCollectors, default=MetricsCollectors.default)

stable_done_cooldown_secs = parameter.IntParameter(default=10,
description="Sets cooldown period to avoid running the same task twice")
"""
Sets a cooldown period in seconds after a task was completed, during this period the same task will not accepted by the scheduler.
"""

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)

Expand Down Expand Up @@ -836,6 +842,10 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if task is None or (task.status != RUNNING and not worker.enabled):
return

# Ignore claims that the task is PENDING if it very recently was marked as DONE.
if status == PENDING and task.status == DONE and (time.time() - task.updated) < self._config.stable_done_cooldown_secs:
return

# for setting priority, we'll sometimes create tasks with unset family and params
if not task.family:
task.family = family
Expand Down
1 change: 1 addition & 0 deletions test/scheduler_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def get_scheduler_config(self):
'disable_window': 10,
'retry_count': 3,
'disable_hard_timeout': 60 * 60,
'stable_done_cooldown_secs': 0
}

def tearDown(self):
Expand Down
67 changes: 67 additions & 0 deletions test/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
import pickle
import tempfile
import time
import os
import shutil
from multiprocessing import Process
from helpers import unittest

import luigi.scheduler
import luigi.server
from helpers import with_config
from luigi.target import FileAlreadyExists


class SchedulerIoTest(unittest.TestCase):
Expand Down Expand Up @@ -281,3 +286,65 @@ def test_get_pending_tasks_with_many_done_tasks(self):

non_trivial_worker = scheduler_state.get_worker('NON_TRIVIAL')
self.assertEqual({'A'}, self.get_pending_ids(non_trivial_worker, scheduler_state))


class FailingOnDoubleRunTask(luigi.Task):
time_to_check_secs = 1
time_to_run_secs = 2
output_dir = luigi.Parameter(default="")

def __init__(self, *args, **kwargs):
super(FailingOnDoubleRunTask, self).__init__(*args, **kwargs)
self.file_name = os.path.join(self.output_dir, "AnyTask")

def complete(self):
time.sleep(self.time_to_check_secs) # e.g., establish connection
exists = os.path.exists(self.file_name)
time.sleep(self.time_to_check_secs) # e.g., close connection
return exists

def run(self):
time.sleep(self.time_to_run_secs)
if os.path.exists(self.file_name):
raise FileAlreadyExists(self.file_name)
open(self.file_name, 'w').close()


class StableDoneCooldownSecsTest(unittest.TestCase):

def setUp(self):
self.p = tempfile.mkdtemp()

def tearDown(self):
shutil.rmtree(self.p)

def run_task(self):
return luigi.build([FailingOnDoubleRunTask(output_dir=self.p)],
parallel_scheduling=True,
parallel_scheduling_processes=2)

@with_config({'worker': {'keep_alive': 'false'}})
def run_same_task_twice(self, failure_expected=False):
server_process = Process(target=luigi.server.run)
process = Process(target=self.run_task)
try:
# scheduler is started
server_process.start()
# first run is started
process.start()
time.sleep(FailingOnDoubleRunTask.time_to_run_secs + FailingOnDoubleRunTask.time_to_check_secs)
# second run of the same task is started
luigi_run_result = self.run_task()
self.assertEqual(luigi_run_result, not failure_expected)
finally:
process.join(1)
server_process.terminate()
server_process.join(1)

@with_config({'scheduler': {'stable_done_cooldown_secs': '5'}})
def test_sending_same_task_twice_with_cooldown_does_not_lead_to_double_run(self):
self.run_same_task_twice(failure_expected=False)

@with_config({'scheduler': {'stable_done_cooldown_secs': '0'}})
def test_sending_same_task_twice_without_cooldown_leads_to_double_run(self):
self.run_same_task_twice(failure_expected=True)
3 changes: 2 additions & 1 deletion test/task_running_resources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import multiprocessing
from contextlib import contextmanager

from helpers import unittest, RunOnceTask
from helpers import unittest, RunOnceTask, with_config

import luigi
import luigi.server
Expand Down Expand Up @@ -73,6 +73,7 @@ def test_resource_reduction(self):

class ConcurrentRunningResourcesTest(unittest.TestCase):

@with_config({'scheduler': {'stable_done_cooldown_secs': '0'}})
def setUp(self):
super(ConcurrentRunningResourcesTest, self).setUp()

Expand Down
2 changes: 1 addition & 1 deletion test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def run(self):
class WorkerTest(LuigiTestCase):

def run(self, result=None):
self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10)
self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10, stable_done_cooldown_secs=0)
self.time = time.time
with Worker(scheduler=self.sch, worker_id='X') as w, Worker(scheduler=self.sch, worker_id='Y') as w2:
self.w = w
Expand Down

0 comments on commit 8ab20a9

Please sign in to comment.