diff --git a/luigi/interface.py b/luigi/interface.py index 5d2cc15f18..627568747b 100644 --- a/luigi/interface.py +++ b/luigi/interface.py @@ -131,7 +131,7 @@ class core(task.Config): class _WorkerSchedulerFactory(object): def create_local_scheduler(self): - return scheduler.CentralPlannerScheduler(prune_on_get_work=True, record_task_history=False) + return scheduler.Scheduler(prune_on_get_work=True, record_task_history=False) def create_remote_scheduler(self, url): return rpc.RemoteScheduler(url) diff --git a/luigi/scheduler.py b/luigi/scheduler.py index b1c9a3500d..366fbd5991 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -498,7 +498,7 @@ def disable_workers(self, workers): self.get_worker(worker).disabled = True -class CentralPlannerScheduler(object): +class Scheduler(object): """ Async scheduler that can handle multiple workers, etc. diff --git a/luigi/server.py b/luigi/server.py index 9c0c0ed0d4..7b9ce9b825 100644 --- a/luigi/server.py +++ b/luigi/server.py @@ -16,7 +16,7 @@ # """ Simple REST server that takes commands in a JSON payload -Interface to the :py:class:`~luigi.scheduler.CentralPlannerScheduler` class. +Interface to the :py:class:`~luigi.scheduler.Scheduler` class. See :doc:`/central_scheduler` for more info. """ # @@ -53,7 +53,7 @@ import tornado.netutil import tornado.web -from luigi.scheduler import CentralPlannerScheduler, RPC_METHODS +from luigi.scheduler import Scheduler, RPC_METHODS logger = logging.getLogger("luigi.server") @@ -247,7 +247,7 @@ def run(api_port=8082, address=None, unix_socket=None, scheduler=None, responder Runs one instance of the API server. """ if scheduler is None: - scheduler = CentralPlannerScheduler() + scheduler = Scheduler() # load scheduler state scheduler.load() diff --git a/luigi/worker.py b/luigi/worker.py index 025839445d..084d4ecf2c 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -20,7 +20,7 @@ 1. Sends all tasks that has to be run 2. Gets tasks from the scheduler that should be run -When running in local mode, the worker talks directly to a :py:class:`~luigi.scheduler.CentralPlannerScheduler` instance. +When running in local mode, the worker talks directly to a :py:class:`~luigi.scheduler.Scheduler` instance. When you run a central server, the worker will talk to the scheduler using a :py:class:`~luigi.rpc.RemoteScheduler` instance. Everything in this module is private to luigi and may change in incompatible @@ -53,7 +53,7 @@ from luigi import notifications from luigi.event import Event from luigi.task_register import load_task -from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, CentralPlannerScheduler +from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler from luigi.target import Target from luigi.task import Task, flatten, getpaths, Config from luigi.task_register import TaskClassException @@ -364,7 +364,7 @@ class Worker(object): def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs): if scheduler is None: - scheduler = CentralPlannerScheduler() + scheduler = Scheduler() self.worker_processes = int(worker_processes) self._worker_info = self._generate_worker_info() diff --git a/test/customized_run_test.py b/test/customized_run_test.py index 31208d2d6a..b7c64a34d0 100644 --- a/test/customized_run_test.py +++ b/test/customized_run_test.py @@ -41,7 +41,7 @@ def run(self): self.has_run = True -class CustomizedLocalScheduler(luigi.scheduler.CentralPlannerScheduler): +class CustomizedLocalScheduler(luigi.scheduler.Scheduler): def __init__(self, *args, **kwargs): super(CustomizedLocalScheduler, self).__init__(*args, **kwargs) diff --git a/test/execution_summary_test.py b/test/execution_summary_test.py index ea4530804f..47770a2ba8 100644 --- a/test/execution_summary_test.py +++ b/test/execution_summary_test.py @@ -30,7 +30,7 @@ class ExecutionSummaryTest(LuigiTestCase): def setUp(self): super(ExecutionSummaryTest, self).setUp() - self.scheduler = luigi.scheduler.CentralPlannerScheduler(prune_on_get_work=False) + self.scheduler = luigi.scheduler.Scheduler(prune_on_get_work=False) self.worker = luigi.worker.Worker(scheduler=self.scheduler) def run_task(self, task): @@ -154,7 +154,7 @@ def requires(self): def new_func(*args, **kwargs): return None - with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + with mock.patch('luigi.scheduler.Scheduler.add_task', new_func): self.run_task(Foo()) d = self.summary_dict() @@ -385,7 +385,7 @@ def run(self): other_worker = luigi.worker.Worker(scheduler=self.scheduler, worker_id="other_worker") other_worker.add(AlreadyRunningTask()) # This also registers this worker - old_func = luigi.scheduler.CentralPlannerScheduler.get_work + old_func = luigi.scheduler.Scheduler.get_work def new_func(*args, **kwargs): new_kwargs = kwargs.copy() @@ -393,7 +393,7 @@ def new_func(*args, **kwargs): old_func(*args, **new_kwargs) return old_func(*args, **kwargs) - with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func): + with mock.patch('luigi.scheduler.Scheduler.get_work', new_func): self.run_task(AlreadyRunningTask()) d = self.summary_dict() @@ -409,14 +409,14 @@ def run(self): other_worker = luigi.worker.Worker(scheduler=self.scheduler, worker_id="other_worker") other_worker.add(AlreadyRunningTask()) # This also registers this worker - old_func = luigi.scheduler.CentralPlannerScheduler.get_work + old_func = luigi.scheduler.Scheduler.get_work def new_func(*args, **kwargs): kwargs['current_tasks'] = None old_func(*args, **kwargs) return old_func(*args, **kwargs) - with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func): + with mock.patch('luigi.scheduler.Scheduler.get_work', new_func): self.run_task(AlreadyRunningTask()) d = self.summary_dict() diff --git a/test/notifications_test.py b/test/notifications_test.py index f909ea2aaa..94ca7dd7ee 100644 --- a/test/notifications_test.py +++ b/test/notifications_test.py @@ -22,7 +22,7 @@ from helpers import with_config from luigi import notifications from luigi import configuration -from luigi.scheduler import CentralPlannerScheduler +from luigi.scheduler import Scheduler from luigi.worker import Worker from luigi import six import luigi @@ -69,7 +69,7 @@ def complete(self): class ExceptionFormatTest(unittest.TestCase): def setUp(self): - self.sch = CentralPlannerScheduler() + self.sch = Scheduler() def test_fail_run(self): task = FailRunTask(foo='foo', bar='bar') diff --git a/test/retcodes_test.py b/test/retcodes_test.py index c16b67d147..e517bf21eb 100644 --- a/test/retcodes_test.py +++ b/test/retcodes_test.py @@ -54,7 +54,7 @@ class AlreadyRunningTask(luigi.Task): def run(self): pass - old_func = luigi.scheduler.CentralPlannerScheduler.get_work + old_func = luigi.scheduler.Scheduler.get_work def new_func(*args, **kwargs): kwargs['current_tasks'] = None @@ -63,7 +63,7 @@ def new_func(*args, **kwargs): res['running_tasks'][0]['worker'] = "not me :)" # Otherwise it will be filtered return res - with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func): + with mock.patch('luigi.scheduler.Scheduler.get_work', new_func): self.run_and_expect('AlreadyRunningTask', 0) # Test default value to be 0 self.run_and_expect('AlreadyRunningTask --retcode-already-running 5', 5) self.run_with_config(dict(already_running='3'), 'AlreadyRunningTask', 3) @@ -167,6 +167,6 @@ def requires(self): def new_func(*args, **kwargs): return None - with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func): + with mock.patch('luigi.scheduler.Scheduler.add_task', new_func): self.run_and_expect('RequiringTask', 0) self.run_and_expect('RequiringTask --retcode-not-run 5', 5) diff --git a/test/rpc_test.py b/test/rpc_test.py index 5861fb473b..711b2109f8 100644 --- a/test/rpc_test.py +++ b/test/rpc_test.py @@ -22,8 +22,8 @@ import mock import luigi.rpc -from luigi.scheduler import CentralPlannerScheduler -import central_planner_test +from luigi.scheduler import Scheduler +import scheduler_api_test import luigi.server from server_test import ServerTestBase import time @@ -88,11 +88,11 @@ def test_get_work_retries_on_null_limited(self): self.assertRaises(luigi.rpc.RPCError, self.get_work, fetch_results) -class RPCTest(central_planner_test.CentralPlannerTest, ServerTestBase): +class RPCTest(scheduler_api_test.SchedulerApiTest, ServerTestBase): def get_app(self): conf = self.get_scheduler_config() - sch = CentralPlannerScheduler(**conf) + sch = Scheduler(**conf) return luigi.server.app(sch) def setUp(self): diff --git a/test/central_planner_test.py b/test/scheduler_api_test.py similarity index 98% rename from test/central_planner_test.py rename to test/scheduler_api_test.py index 00e9377877..e5d2d6460a 100644 --- a/test/central_planner_test.py +++ b/test/scheduler_api_test.py @@ -17,24 +17,22 @@ import time from helpers import unittest - from nose.plugins.attrib import attr - import luigi.notifications from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, \ - UNKNOWN, RUNNING, CentralPlannerScheduler + UNKNOWN, RUNNING, Scheduler luigi.notifications.DEBUG = True WORKER = 'myworker' @attr('scheduler') -class CentralPlannerTest(unittest.TestCase): +class SchedulerApiTest(unittest.TestCase): def setUp(self): - super(CentralPlannerTest, self).setUp() + super(SchedulerApiTest, self).setUp() conf = self.get_scheduler_config() - self.sch = CentralPlannerScheduler(**conf) + self.sch = Scheduler(**conf) self.time = time.time def get_scheduler_config(self): @@ -49,7 +47,7 @@ def get_scheduler_config(self): } def tearDown(self): - super(CentralPlannerTest, self).tearDown() + super(SchedulerApiTest, self).tearDown() if time.time != self.time: time.time = self.time @@ -744,7 +742,7 @@ def test_disable_and_done(self): self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A') def test_automatic_re_enable(self): - self.sch = CentralPlannerScheduler(disable_failures=2, disable_persist=100) + self.sch = Scheduler(disable_failures=2, disable_persist=100) self.setTime(0) self.sch.add_task(worker=WORKER, task_id='A', status=FAILED) self.sch.add_task(worker=WORKER, task_id='A', status=FAILED) @@ -757,7 +755,7 @@ def test_automatic_re_enable(self): self.assertEqual(FAILED, self.sch.task_list('', '')['A']['status']) def test_automatic_re_enable_with_one_failure_allowed(self): - self.sch = CentralPlannerScheduler(disable_failures=1, disable_persist=100) + self.sch = Scheduler(disable_failures=1, disable_persist=100) self.setTime(0) self.sch.add_task(worker=WORKER, task_id='A', status=FAILED) @@ -769,7 +767,7 @@ def test_automatic_re_enable_with_one_failure_allowed(self): self.assertEqual(FAILED, self.sch.task_list('', '')['A']['status']) def test_no_automatic_re_enable_after_manual_disable(self): - self.sch = CentralPlannerScheduler(disable_persist=100) + self.sch = Scheduler(disable_persist=100) self.setTime(0) self.sch.add_task(worker=WORKER, task_id='A', status=DISABLED) @@ -781,7 +779,7 @@ def test_no_automatic_re_enable_after_manual_disable(self): self.assertEqual(DISABLED, self.sch.task_list('', '')['A']['status']) def test_no_automatic_re_enable_after_auto_then_manual_disable(self): - self.sch = CentralPlannerScheduler(disable_failures=2, disable_persist=100) + self.sch = Scheduler(disable_failures=2, disable_persist=100) self.setTime(0) self.sch.add_task(worker=WORKER, task_id='A', status=FAILED) self.sch.add_task(worker=WORKER, task_id='A', status=FAILED) @@ -882,20 +880,20 @@ def test_prune_worker(self): self.assertFalse(self.sch.worker_list()) def test_task_list_beyond_limit(self): - sch = CentralPlannerScheduler(max_shown_tasks=3) + sch = Scheduler(max_shown_tasks=3) for c in 'ABCD': sch.add_task(worker=WORKER, task_id=c) self.assertEqual(set('ABCD'), set(sch.task_list('PENDING', '', False).keys())) self.assertEqual({'num_tasks': 4}, sch.task_list('PENDING', '')) def test_task_list_within_limit(self): - sch = CentralPlannerScheduler(max_shown_tasks=4) + sch = Scheduler(max_shown_tasks=4) for c in 'ABCD': sch.add_task(worker=WORKER, task_id=c) self.assertEqual(set('ABCD'), set(sch.task_list('PENDING', '').keys())) def test_task_lists_some_beyond_limit(self): - sch = CentralPlannerScheduler(max_shown_tasks=3) + sch = Scheduler(max_shown_tasks=3) for c in 'ABCD': sch.add_task(worker=WORKER, task_id=c, status=DONE) for c in 'EFG': @@ -965,7 +963,7 @@ def test_task_list_filter_by_multiple_search_terms(self): self.search_pending('ClassA 2016-02-01 num', {expected}) def test_search_results_beyond_limit(self): - sch = CentralPlannerScheduler(max_shown_tasks=3) + sch = Scheduler(max_shown_tasks=3) for i in range(4): sch.add_task(worker=WORKER, family='Test', params={'p': str(i)}, task_id='Test_%i' % i) self.assertEqual({'num_tasks': 4}, sch.task_list('PENDING', '', search='Test')) @@ -1107,7 +1105,7 @@ def test_assistants_dont_nurture_finished_statuses(self): Assistants should not affect longevity expect for the tasks that it is running, par the one it's actually running. """ - self.sch = CentralPlannerScheduler(retry_delay=100000000000) # Never pendify failed tasks + self.sch = Scheduler(retry_delay=100000000000) # Never pendify failed tasks self.setTime(1) self.sch.add_worker('assistant', [('assistant', True)]) self.sch.ping(worker='assistant') @@ -1148,8 +1146,8 @@ def test_no_crash_on_only_disable_hard_timeout(self): There was some failure happening when disable_hard_timeout was set but disable_failures was not. """ - self.sch = CentralPlannerScheduler(retry_delay=5, - disable_hard_timeout=100) + self.sch = Scheduler(retry_delay=5, + disable_hard_timeout=100) self.setTime(1) self.sch.add_worker(WORKER, []) self.sch.ping(worker=WORKER) diff --git a/test/scheduler_test.py b/test/scheduler_test.py index 5073042cbc..66be884757 100644 --- a/test/scheduler_test.py +++ b/test/scheduler_test.py @@ -23,13 +23,9 @@ import luigi.scheduler from helpers import with_config -import logging -logging.config.fileConfig('test/testconfig/logging.cfg', disable_existing_loggers=False) -luigi.notifications.DEBUG = True - -class SchedulerTest(unittest.TestCase): +class SchedulerIoTest(unittest.TestCase): def test_load_old_state(self): tasks = {} @@ -60,20 +56,20 @@ def test_load_broken_state(self): @with_config({'scheduler': {'disable-num-failures': '44', 'worker-disconnect-delay': '55'}}) def test_scheduler_with_config(self): - cps = luigi.scheduler.CentralPlannerScheduler() - self.assertEqual(44, cps._config.disable_failures) - self.assertEqual(55, cps._config.worker_disconnect_delay) + scheduler = luigi.scheduler.Scheduler() + self.assertEqual(44, scheduler._config.disable_failures) + self.assertEqual(55, scheduler._config.worker_disconnect_delay) # Override - cps = luigi.scheduler.CentralPlannerScheduler(disable_failures=66, - worker_disconnect_delay=77) - self.assertEqual(66, cps._config.disable_failures) - self.assertEqual(77, cps._config.worker_disconnect_delay) + scheduler = luigi.scheduler.Scheduler(disable_failures=66, + worker_disconnect_delay=77) + self.assertEqual(66, scheduler._config.disable_failures) + self.assertEqual(77, scheduler._config.worker_disconnect_delay) @with_config({'resources': {'a': '100', 'b': '200'}}) def test_scheduler_with_resources(self): - cps = luigi.scheduler.CentralPlannerScheduler() - self.assertEqual({'a': 100, 'b': 200}, cps._resources) + scheduler = luigi.scheduler.Scheduler() + self.assertEqual({'a': 100, 'b': 200}, scheduler._resources) @with_config({'scheduler': {'record_task_history': 'True'}, 'task_history': {'db_connection': 'sqlite:////none/existing/path/hist.db'}}) @@ -82,28 +78,31 @@ def test_local_scheduler_task_history_status(self): self.assertEqual(False, ls._config.record_task_history) def test_load_recovers_tasks_index(self): - cps = luigi.scheduler.CentralPlannerScheduler() - cps.add_task(worker='A', task_id='1') - cps.add_task(worker='B', task_id='2') - cps.add_task(worker='C', task_id='3') - cps.add_task(worker='D', task_id='4') - self.assertEqual(cps.get_work(worker='A')['task_id'], '1') + scheduler = luigi.scheduler.Scheduler() + scheduler.add_task(worker='A', task_id='1') + scheduler.add_task(worker='B', task_id='2') + scheduler.add_task(worker='C', task_id='3') + scheduler.add_task(worker='D', task_id='4') + self.assertEqual(scheduler.get_work(worker='A')['task_id'], '1') with tempfile.NamedTemporaryFile(delete=True) as fn: - def reload_from_disk(cps): - cps._state._state_path = fn.name - cps.dump() - cps = luigi.scheduler.CentralPlannerScheduler() - cps._state._state_path = fn.name - cps.load() - return cps - cps = reload_from_disk(cps=cps) - self.assertEqual(cps.get_work(worker='B')['task_id'], '2') - self.assertEqual(cps.get_work(worker='C')['task_id'], '3') - cps = reload_from_disk(cps=cps) - self.assertEqual(cps.get_work(worker='D')['task_id'], '4') + def reload_from_disk(scheduler): + scheduler._state._state_path = fn.name + scheduler.dump() + scheduler = luigi.scheduler.Scheduler() + scheduler._state._state_path = fn.name + scheduler.load() + return scheduler + scheduler = reload_from_disk(scheduler=scheduler) + self.assertEqual(scheduler.get_work(worker='B')['task_id'], '2') + self.assertEqual(scheduler.get_work(worker='C')['task_id'], '3') + scheduler = reload_from_disk(scheduler=scheduler) + self.assertEqual(scheduler.get_work(worker='D')['task_id'], '4') def test_worker_prune_after_init(self): + """ + See https://github.com/spotify/luigi/pull/1019 + """ worker = luigi.scheduler.Worker(123) class TmpCfg: @@ -111,6 +110,3 @@ def __init__(self): self.worker_disconnect_delay = 10 worker.prune(TmpCfg()) - -if __name__ == '__main__': - unittest.main() diff --git a/test/scheduler_visualisation_test.py b/test/scheduler_visualisation_test.py index 6d4762156f..51df381fb7 100644 --- a/test/scheduler_visualisation_test.py +++ b/test/scheduler_visualisation_test.py @@ -102,7 +102,7 @@ def run(self): class SchedulerVisualisationTest(unittest.TestCase): def setUp(self): - self.scheduler = luigi.scheduler.CentralPlannerScheduler() + self.scheduler = luigi.scheduler.Scheduler() def tearDown(self): pass @@ -160,7 +160,7 @@ def complete(self): root_task = LinearTask(100) - self.scheduler = luigi.scheduler.CentralPlannerScheduler(max_graph_nodes=10) + self.scheduler = luigi.scheduler.Scheduler(max_graph_nodes=10) self._build([root_task]) graph = self.scheduler.dep_graph(root_task.task_id) @@ -181,7 +181,7 @@ def complete(self): root_task = LinearTask(100) - self.scheduler = luigi.scheduler.CentralPlannerScheduler(max_graph_nodes=10) + self.scheduler = luigi.scheduler.Scheduler(max_graph_nodes=10) self._build([root_task]) graph = self.scheduler.inverse_dep_graph(LinearTask(0).task_id) @@ -199,7 +199,7 @@ def requires(self): root_task = BinaryTreeTask(1) - self.scheduler = luigi.scheduler.CentralPlannerScheduler(max_graph_nodes=10) + self.scheduler = luigi.scheduler.Scheduler(max_graph_nodes=10) self._build([root_task]) graph = self.scheduler.dep_graph(root_task.task_id) @@ -221,7 +221,7 @@ def complete(self): root_task = LinearTask(100) - self.scheduler = luigi.scheduler.CentralPlannerScheduler(max_graph_nodes=10) + self.scheduler = luigi.scheduler.Scheduler(max_graph_nodes=10) self._build([root_task]) graph = self.scheduler.dep_graph(root_task.task_id) diff --git a/test/server_test.py b/test/server_test.py index 4439fad11e..22ea1a80e5 100644 --- a/test/server_test.py +++ b/test/server_test.py @@ -25,7 +25,7 @@ import luigi.rpc import luigi.server import luigi.cmdline -from luigi.scheduler import CentralPlannerScheduler +from luigi.scheduler import Scheduler from luigi.six.moves.urllib.parse import ( urlencode, ParseResult, quote as urlquote ) @@ -58,7 +58,7 @@ def _is_running_from_main_thread(): class ServerTestBase(AsyncHTTPTestCase): def get_app(self): - return luigi.server.app(CentralPlannerScheduler()) + return luigi.server.app(Scheduler()) def setUp(self): super(ServerTestBase, self).setUp() diff --git a/test/task_history_test.py b/test/task_history_test.py index 2a644bccc7..e5980ced8e 100644 --- a/test/task_history_test.py +++ b/test/task_history_test.py @@ -44,7 +44,7 @@ class TaskHistoryTest(LuigiTestCase): def test_run(self): th = SimpleTaskHistory() - sch = luigi.scheduler.CentralPlannerScheduler(task_history_impl=th) + sch = luigi.scheduler.Scheduler(task_history_impl=th) with luigi.worker.Worker(scheduler=sch) as w: class MyTask(luigi.Task): pass diff --git a/test/task_status_message_test.py b/test/task_status_message_test.py index b85b3573ca..b4356d74d2 100644 --- a/test/task_status_message_test.py +++ b/test/task_status_message_test.py @@ -28,7 +28,7 @@ class TaskStatusMessageTest(LuigiTestCase): def test_run(self): message = "test message" - sch = luigi.scheduler.CentralPlannerScheduler() + sch = luigi.scheduler.Scheduler() with luigi.worker.Worker(scheduler=sch) as w: class MyTask(luigi.Task): def run(self): diff --git a/test/worker_external_task_test.py b/test/worker_external_task_test.py index 2b53e8e205..19b5959266 100644 --- a/test/worker_external_task_test.py +++ b/test/worker_external_task_test.py @@ -14,7 +14,7 @@ import luigi from luigi.file import LocalTarget -from luigi.scheduler import CentralPlannerScheduler +from luigi.scheduler import Scheduler import luigi.server import luigi.worker import luigi.task @@ -92,7 +92,7 @@ def _build(self, tasks): w.run() def _make_worker(self): - self.scheduler = CentralPlannerScheduler(prune_on_get_work=True) + self.scheduler = Scheduler(prune_on_get_work=True) return luigi.worker.Worker(scheduler=self.scheduler, worker_processes=1) def test_external_dependency_already_complete(self): diff --git a/test/worker_test.py b/test/worker_test.py index 190d050a31..cac44beda9 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -33,7 +33,7 @@ import mock from luigi import ExternalTask, RemoteScheduler, Task, Event from luigi.mock import MockTarget, MockFileSystem -from luigi.scheduler import CentralPlannerScheduler +from luigi.scheduler import Scheduler from luigi.worker import Worker from luigi.rpc import RPCError from luigi import six @@ -112,7 +112,7 @@ def run(self): class WorkerTest(unittest.TestCase): def run(self, result=None): - self.sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) self.time = time.time with Worker(scheduler=self.sch, worker_id='X') as w, Worker(scheduler=self.sch, worker_id='Y') as w2: self.w = w @@ -345,7 +345,7 @@ def complete(self): self.assertFalse(b.has_run) def test_unknown_dep(self): - # see central_planner_test.CentralPlannerTest.test_remove_dep + # see related test_remove_dep test (grep for it) class A(ExternalTask): def complete(self): @@ -511,7 +511,7 @@ def complete(self): eb = ExternalB() self.assertEqual(str(eb), "B()") - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id='X') as w, Worker(scheduler=sch, worker_id='Y') as w2: self.assertTrue(w.add(b)) self.assertTrue(w2.add(eb)) @@ -540,7 +540,7 @@ def complete(self): self.assertEqual(str(eb), "B()") - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id='X') as w, Worker(scheduler=sch, worker_id='Y') as w2: self.assertTrue(w2.add(eb)) self.assertTrue(w.add(b)) @@ -571,7 +571,7 @@ def run(self): b = B() - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id='X', keep_alive=True, count_uniques=True) as w: with Worker(scheduler=sch, worker_id='Y', keep_alive=True, count_uniques=True, wait_interval=0.1) as w2: @@ -605,7 +605,7 @@ def run(self): b = B() - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id='X', keep_alive=True, count_uniques=True) as w: with Worker(scheduler=sch, worker_id='Y', keep_alive=True, count_uniques=True, wait_interval=0.1) as w2: @@ -638,7 +638,7 @@ def requires(self): return a, c b = B() - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id="foo") as w: self.assertFalse(w.add(b)) self.assertTrue(w.run()) @@ -671,7 +671,7 @@ def requires(self): return c, a b = B() - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id="foo") as w: self.assertFalse(w.add(b)) self.assertTrue(w.run()) @@ -725,7 +725,7 @@ def test_ping_retry(self): Kind of ugly since it uses actual timing with sleep to test the thread """ - sch = CentralPlannerScheduler( + sch = Scheduler( retry_delay=100, remove_delay=1000, worker_disconnect_delay=10, @@ -785,7 +785,7 @@ class WorkerEmailTest(LuigiTestCase): def run(self, result=None): super(WorkerEmailTest, self).setUp() - sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) with Worker(scheduler=sch, worker_id="foo") as self.worker: super(WorkerEmailTest, self).run(result) @@ -1072,7 +1072,7 @@ def run(self): class AssistantTest(unittest.TestCase): def run(self, result=None): - self.sch = CentralPlannerScheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) + self.sch = Scheduler(retry_delay=100, remove_delay=1000, worker_disconnect_delay=10) self.assistant = Worker(scheduler=self.sch, worker_id='Y', assistant=True) with Worker(scheduler=self.sch, worker_id='X') as w: self.w = w