Skip to content

Commit

Permalink
Rename CentralPlannerScheduler to Scheduler (#1781)
Browse files Browse the repository at this point in the history
I find very little value in having that complicated name. In particular
it should be clear that the main class of scheduler.py is in fact
the Scheduler() class.

I've also renamed the tests testing the api to
test/scheduler_api_test.py. This should feel more intuitive to have
"scheduler" in the name of the file.
  • Loading branch information
Tarrasch authored Jul 25, 2016
1 parent 5bd9b43 commit e02ba97
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 100 deletions.
2 changes: 1 addition & 1 deletion luigi/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class core(task.Config):
class _WorkerSchedulerFactory(object):

def create_local_scheduler(self):
return scheduler.CentralPlannerScheduler(prune_on_get_work=True, record_task_history=False)
return scheduler.Scheduler(prune_on_get_work=True, record_task_history=False)

def create_remote_scheduler(self, url):
return rpc.RemoteScheduler(url)
Expand Down
2 changes: 1 addition & 1 deletion luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def disable_workers(self, workers):
self.get_worker(worker).disabled = True


class CentralPlannerScheduler(object):
class Scheduler(object):
"""
Async scheduler that can handle multiple workers, etc.
Expand Down
6 changes: 3 additions & 3 deletions luigi/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#
"""
Simple REST server that takes commands in a JSON payload
Interface to the :py:class:`~luigi.scheduler.CentralPlannerScheduler` class.
Interface to the :py:class:`~luigi.scheduler.Scheduler` class.
See :doc:`/central_scheduler` for more info.
"""
#
Expand Down Expand Up @@ -53,7 +53,7 @@
import tornado.netutil
import tornado.web

from luigi.scheduler import CentralPlannerScheduler, RPC_METHODS
from luigi.scheduler import Scheduler, RPC_METHODS

logger = logging.getLogger("luigi.server")

Expand Down Expand Up @@ -247,7 +247,7 @@ def run(api_port=8082, address=None, unix_socket=None, scheduler=None, responder
Runs one instance of the API server.
"""
if scheduler is None:
scheduler = CentralPlannerScheduler()
scheduler = Scheduler()

# load scheduler state
scheduler.load()
Expand Down
6 changes: 3 additions & 3 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
1. Sends all tasks that has to be run
2. Gets tasks from the scheduler that should be run
When running in local mode, the worker talks directly to a :py:class:`~luigi.scheduler.CentralPlannerScheduler` instance.
When running in local mode, the worker talks directly to a :py:class:`~luigi.scheduler.Scheduler` instance.
When you run a central server, the worker will talk to the scheduler using a :py:class:`~luigi.rpc.RemoteScheduler` instance.
Everything in this module is private to luigi and may change in incompatible
Expand Down Expand Up @@ -53,7 +53,7 @@
from luigi import notifications
from luigi.event import Event
from luigi.task_register import load_task
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, CentralPlannerScheduler
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler
from luigi.target import Target
from luigi.task import Task, flatten, getpaths, Config
from luigi.task_register import TaskClassException
Expand Down Expand Up @@ -364,7 +364,7 @@ class Worker(object):

def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs):
if scheduler is None:
scheduler = CentralPlannerScheduler()
scheduler = Scheduler()

self.worker_processes = int(worker_processes)
self._worker_info = self._generate_worker_info()
Expand Down
2 changes: 1 addition & 1 deletion test/customized_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def run(self):
self.has_run = True


class CustomizedLocalScheduler(luigi.scheduler.CentralPlannerScheduler):
class CustomizedLocalScheduler(luigi.scheduler.Scheduler):

def __init__(self, *args, **kwargs):
super(CustomizedLocalScheduler, self).__init__(*args, **kwargs)
Expand Down
12 changes: 6 additions & 6 deletions test/execution_summary_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ExecutionSummaryTest(LuigiTestCase):

def setUp(self):
super(ExecutionSummaryTest, self).setUp()
self.scheduler = luigi.scheduler.CentralPlannerScheduler(prune_on_get_work=False)
self.scheduler = luigi.scheduler.Scheduler(prune_on_get_work=False)
self.worker = luigi.worker.Worker(scheduler=self.scheduler)

def run_task(self, task):
Expand Down Expand Up @@ -154,7 +154,7 @@ def requires(self):
def new_func(*args, **kwargs):
return None

with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func):
with mock.patch('luigi.scheduler.Scheduler.add_task', new_func):
self.run_task(Foo())

d = self.summary_dict()
Expand Down Expand Up @@ -385,15 +385,15 @@ def run(self):

other_worker = luigi.worker.Worker(scheduler=self.scheduler, worker_id="other_worker")
other_worker.add(AlreadyRunningTask()) # This also registers this worker
old_func = luigi.scheduler.CentralPlannerScheduler.get_work
old_func = luigi.scheduler.Scheduler.get_work

def new_func(*args, **kwargs):
new_kwargs = kwargs.copy()
new_kwargs['worker'] = 'other_worker'
old_func(*args, **new_kwargs)
return old_func(*args, **kwargs)

with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func):
with mock.patch('luigi.scheduler.Scheduler.get_work', new_func):
self.run_task(AlreadyRunningTask())

d = self.summary_dict()
Expand All @@ -409,14 +409,14 @@ def run(self):

other_worker = luigi.worker.Worker(scheduler=self.scheduler, worker_id="other_worker")
other_worker.add(AlreadyRunningTask()) # This also registers this worker
old_func = luigi.scheduler.CentralPlannerScheduler.get_work
old_func = luigi.scheduler.Scheduler.get_work

def new_func(*args, **kwargs):
kwargs['current_tasks'] = None
old_func(*args, **kwargs)
return old_func(*args, **kwargs)

with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func):
with mock.patch('luigi.scheduler.Scheduler.get_work', new_func):
self.run_task(AlreadyRunningTask())

d = self.summary_dict()
Expand Down
4 changes: 2 additions & 2 deletions test/notifications_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from helpers import with_config
from luigi import notifications
from luigi import configuration
from luigi.scheduler import CentralPlannerScheduler
from luigi.scheduler import Scheduler
from luigi.worker import Worker
from luigi import six
import luigi
Expand Down Expand Up @@ -69,7 +69,7 @@ def complete(self):
class ExceptionFormatTest(unittest.TestCase):

def setUp(self):
self.sch = CentralPlannerScheduler()
self.sch = Scheduler()

def test_fail_run(self):
task = FailRunTask(foo='foo', bar='bar')
Expand Down
6 changes: 3 additions & 3 deletions test/retcodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AlreadyRunningTask(luigi.Task):
def run(self):
pass

old_func = luigi.scheduler.CentralPlannerScheduler.get_work
old_func = luigi.scheduler.Scheduler.get_work

def new_func(*args, **kwargs):
kwargs['current_tasks'] = None
Expand All @@ -63,7 +63,7 @@ def new_func(*args, **kwargs):
res['running_tasks'][0]['worker'] = "not me :)" # Otherwise it will be filtered
return res

with mock.patch('luigi.scheduler.CentralPlannerScheduler.get_work', new_func):
with mock.patch('luigi.scheduler.Scheduler.get_work', new_func):
self.run_and_expect('AlreadyRunningTask', 0) # Test default value to be 0
self.run_and_expect('AlreadyRunningTask --retcode-already-running 5', 5)
self.run_with_config(dict(already_running='3'), 'AlreadyRunningTask', 3)
Expand Down Expand Up @@ -167,6 +167,6 @@ def requires(self):
def new_func(*args, **kwargs):
return None

with mock.patch('luigi.scheduler.CentralPlannerScheduler.add_task', new_func):
with mock.patch('luigi.scheduler.Scheduler.add_task', new_func):
self.run_and_expect('RequiringTask', 0)
self.run_and_expect('RequiringTask --retcode-not-run 5', 5)
8 changes: 4 additions & 4 deletions test/rpc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import mock

import luigi.rpc
from luigi.scheduler import CentralPlannerScheduler
import central_planner_test
from luigi.scheduler import Scheduler
import scheduler_api_test
import luigi.server
from server_test import ServerTestBase
import time
Expand Down Expand Up @@ -88,11 +88,11 @@ def test_get_work_retries_on_null_limited(self):
self.assertRaises(luigi.rpc.RPCError, self.get_work, fetch_results)


class RPCTest(central_planner_test.CentralPlannerTest, ServerTestBase):
class RPCTest(scheduler_api_test.SchedulerApiTest, ServerTestBase):

def get_app(self):
conf = self.get_scheduler_config()
sch = CentralPlannerScheduler(**conf)
sch = Scheduler(**conf)
return luigi.server.app(sch)

def setUp(self):
Expand Down
34 changes: 16 additions & 18 deletions test/central_planner_test.py → test/scheduler_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@

import time
from helpers import unittest

from nose.plugins.attrib import attr

import luigi.notifications
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, \
UNKNOWN, RUNNING, CentralPlannerScheduler
UNKNOWN, RUNNING, Scheduler

luigi.notifications.DEBUG = True
WORKER = 'myworker'


@attr('scheduler')
class CentralPlannerTest(unittest.TestCase):
class SchedulerApiTest(unittest.TestCase):

def setUp(self):
super(CentralPlannerTest, self).setUp()
super(SchedulerApiTest, self).setUp()
conf = self.get_scheduler_config()
self.sch = CentralPlannerScheduler(**conf)
self.sch = Scheduler(**conf)
self.time = time.time

def get_scheduler_config(self):
Expand All @@ -49,7 +47,7 @@ def get_scheduler_config(self):
}

def tearDown(self):
super(CentralPlannerTest, self).tearDown()
super(SchedulerApiTest, self).tearDown()
if time.time != self.time:
time.time = self.time

Expand Down Expand Up @@ -744,7 +742,7 @@ def test_disable_and_done(self):
self.assertEqual(self.sch.get_work(worker=WORKER)['task_id'], 'A')

def test_automatic_re_enable(self):
self.sch = CentralPlannerScheduler(disable_failures=2, disable_persist=100)
self.sch = Scheduler(disable_failures=2, disable_persist=100)
self.setTime(0)
self.sch.add_task(worker=WORKER, task_id='A', status=FAILED)
self.sch.add_task(worker=WORKER, task_id='A', status=FAILED)
Expand All @@ -757,7 +755,7 @@ def test_automatic_re_enable(self):
self.assertEqual(FAILED, self.sch.task_list('', '')['A']['status'])

def test_automatic_re_enable_with_one_failure_allowed(self):
self.sch = CentralPlannerScheduler(disable_failures=1, disable_persist=100)
self.sch = Scheduler(disable_failures=1, disable_persist=100)
self.setTime(0)
self.sch.add_task(worker=WORKER, task_id='A', status=FAILED)

Expand All @@ -769,7 +767,7 @@ def test_automatic_re_enable_with_one_failure_allowed(self):
self.assertEqual(FAILED, self.sch.task_list('', '')['A']['status'])

def test_no_automatic_re_enable_after_manual_disable(self):
self.sch = CentralPlannerScheduler(disable_persist=100)
self.sch = Scheduler(disable_persist=100)
self.setTime(0)
self.sch.add_task(worker=WORKER, task_id='A', status=DISABLED)

Expand All @@ -781,7 +779,7 @@ def test_no_automatic_re_enable_after_manual_disable(self):
self.assertEqual(DISABLED, self.sch.task_list('', '')['A']['status'])

def test_no_automatic_re_enable_after_auto_then_manual_disable(self):
self.sch = CentralPlannerScheduler(disable_failures=2, disable_persist=100)
self.sch = Scheduler(disable_failures=2, disable_persist=100)
self.setTime(0)
self.sch.add_task(worker=WORKER, task_id='A', status=FAILED)
self.sch.add_task(worker=WORKER, task_id='A', status=FAILED)
Expand Down Expand Up @@ -882,20 +880,20 @@ def test_prune_worker(self):
self.assertFalse(self.sch.worker_list())

def test_task_list_beyond_limit(self):
sch = CentralPlannerScheduler(max_shown_tasks=3)
sch = Scheduler(max_shown_tasks=3)
for c in 'ABCD':
sch.add_task(worker=WORKER, task_id=c)
self.assertEqual(set('ABCD'), set(sch.task_list('PENDING', '', False).keys()))
self.assertEqual({'num_tasks': 4}, sch.task_list('PENDING', ''))

def test_task_list_within_limit(self):
sch = CentralPlannerScheduler(max_shown_tasks=4)
sch = Scheduler(max_shown_tasks=4)
for c in 'ABCD':
sch.add_task(worker=WORKER, task_id=c)
self.assertEqual(set('ABCD'), set(sch.task_list('PENDING', '').keys()))

def test_task_lists_some_beyond_limit(self):
sch = CentralPlannerScheduler(max_shown_tasks=3)
sch = Scheduler(max_shown_tasks=3)
for c in 'ABCD':
sch.add_task(worker=WORKER, task_id=c, status=DONE)
for c in 'EFG':
Expand Down Expand Up @@ -965,7 +963,7 @@ def test_task_list_filter_by_multiple_search_terms(self):
self.search_pending('ClassA 2016-02-01 num', {expected})

def test_search_results_beyond_limit(self):
sch = CentralPlannerScheduler(max_shown_tasks=3)
sch = Scheduler(max_shown_tasks=3)
for i in range(4):
sch.add_task(worker=WORKER, family='Test', params={'p': str(i)}, task_id='Test_%i' % i)
self.assertEqual({'num_tasks': 4}, sch.task_list('PENDING', '', search='Test'))
Expand Down Expand Up @@ -1107,7 +1105,7 @@ def test_assistants_dont_nurture_finished_statuses(self):
Assistants should not affect longevity expect for the tasks that it is
running, par the one it's actually running.
"""
self.sch = CentralPlannerScheduler(retry_delay=100000000000) # Never pendify failed tasks
self.sch = Scheduler(retry_delay=100000000000) # Never pendify failed tasks
self.setTime(1)
self.sch.add_worker('assistant', [('assistant', True)])
self.sch.ping(worker='assistant')
Expand Down Expand Up @@ -1148,8 +1146,8 @@ def test_no_crash_on_only_disable_hard_timeout(self):
There was some failure happening when disable_hard_timeout was set but
disable_failures was not.
"""
self.sch = CentralPlannerScheduler(retry_delay=5,
disable_hard_timeout=100)
self.sch = Scheduler(retry_delay=5,
disable_hard_timeout=100)
self.setTime(1)
self.sch.add_worker(WORKER, [])
self.sch.ping(worker=WORKER)
Expand Down
Loading

0 comments on commit e02ba97

Please sign in to comment.