Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass through parameters on Range tasks to of tasks #1675

Merged
merged 6 commits into from
Jun 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions doc/luigi_patterns.rst
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,17 @@ and stop (exclusive) parameters specified:
Propagating parameters with Range
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When your recurring task has a parameter, you'll at first notice that the Range
tasks do not recognize or propagate parameters passed to them. The easiest
solution is to set the parameter at the task family level as described
:ref:`here <Parameter-class-level-parameters>`.
Some tasks you want to recur may include additional parameters which need to be configured.
The Range classes provide a parameter which accepts a :class:`~luigi.parameter.DictParameter`
and passes any parameters onwards for this purpose.

.. code-block:: console

luigi RangeDaily --of MyTask --start 2014-10-31 --of-param '{"my-param": 123}'

Alternatively, you can specify parameters at the task family level (as described :ref:`here <Parameter-class-level-parameters>`),
however these will not appear in the task name for the upstream Range task which
can have implications in how the scheduler and visualizer handle task instances.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well written. Thanks! :)


.. code-block:: console

Expand Down
59 changes: 54 additions & 5 deletions luigi/tools/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"""

import itertools
import functools
import logging
import warnings
import operator
Expand Down Expand Up @@ -88,6 +89,7 @@ class RangeBase(luigi.WrapperTask):
# TODO lift the single parameter constraint by passing unknown parameters through WrapperTask?
of = luigi.TaskParameter(
description="task name to be completed. The task must take a single datetime parameter")
of_params = luigi.DictParameter(default=dict(), description="Arguments to be provided to the 'of' class when instantiating")
# The common parameters 'start' and 'stop' have type (e.g. DateParameter,
# DateHourParameter) dependent on the concrete subclass, cumbersome to
# define here generically without dark magic. Refer to the overrides.
Expand Down Expand Up @@ -125,6 +127,18 @@ def datetime_to_parameter(self, dt):
def parameter_to_datetime(self, p):
raise NotImplementedError

def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
raise NotImplementedError

def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
raise NotImplementedError

def moving_start(self, now):
"""
Returns a datetime from which to ensure contiguousness in the case when
Expand Down Expand Up @@ -174,11 +188,19 @@ def _format_range(self, datetimes):
return '[%s, %s]' % (param_first, param_last)

def _instantiate_task_cls(self, param):
return self.of(**self._task_parameters(param))

@property
def _param_name(self):
if self.param_name is None:
return self.of_cls(param)
return next(x[0] for x in self.of.get_params() if x[1].positional)
else:
kwargs = {self.param_name: param}
return self.of_cls(**kwargs)
return self.param_name

def _task_parameters(self, param):
kwargs = dict(**self.of_params)
kwargs[self._param_name] = param
return kwargs

def requires(self):
# cache because we anticipate a fair amount of computation
Expand Down Expand Up @@ -282,6 +304,19 @@ def datetime_to_parameter(self, dt):
def parameter_to_datetime(self, p):
return datetime(p.year, p.month, p.day)

def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt.date())

def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
dt = p[self._param_name]
return datetime(dt.year, dt.month, dt.day)

def moving_start(self, now):
return now - timedelta(days=self.days_back)

Expand Down Expand Up @@ -332,6 +367,18 @@ def datetime_to_parameter(self, dt):
def parameter_to_datetime(self, p):
return p

def datetime_to_parameters(self, dt):
"""
Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter
"""
return self._task_parameters(dt)

def parameters_to_datetime(self, p):
"""
Given a dictionary of parameters, will extract the ranged task parameter value
"""
return p[self._param_name]

def moving_start(self, now):
return now - timedelta(hours=self.hours_back)

Expand Down Expand Up @@ -529,7 +576,8 @@ class RangeDaily(RangeDailyBase):

def missing_datetimes(self, finite_datetimes):
try:
complete_parameters = self.of_cls.bulk_complete(map(self.datetime_to_parameter, finite_datetimes))
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
Expand Down Expand Up @@ -557,7 +605,8 @@ class RangeHourly(RangeHourlyBase):
def missing_datetimes(self, finite_datetimes):
try:
# TODO: Why is there a list() here but not for the RangeDaily??
complete_parameters = self.of_cls.bulk_complete(list(map(self.datetime_to_parameter, finite_datetimes)))
cls_with_params = functools.partial(self.of, **self.of_params)
complete_parameters = self.of.bulk_complete.__func__(cls_with_params, list(map(self.datetime_to_parameter, finite_datetimes)))
return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters))
except NotImplementedError:
return infer_bulk_complete_from_fs(
Expand Down
99 changes: 99 additions & 0 deletions test/range_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,35 @@ def output(self):
actual = [str(t) for t in task.requires()]
self.assertEqual(actual, expected)

def test_bulk_complete_of_params(self):
class BulkCompleteDailyTask(luigi.Task):
non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False)
d = luigi.DateParameter()
arbitrary_argument = luigi.BoolParameter()

@classmethod
def bulk_complete(cls, parameter_tuples):
ptuples = list(parameter_tuples)
for t in map(cls, ptuples):
assert t.arbitrary_argument
return ptuples[:-2]

def output(self):
raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete")

task = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)),
of=BulkCompleteDailyTask,
of_params=dict(arbitrary_argument=True),
start=datetime.date(2015, 11, 1),
stop=datetime.date(2015, 12, 1))
expected = [
'BulkCompleteDailyTask(d=2015-11-29, arbitrary_argument=True)',
'BulkCompleteDailyTask(d=2015-11-30, arbitrary_argument=True)',
]

actual = [str(t) for t in task.requires()]
self.assertEqual(actual, expected)

@mock.patch('luigi.mock.MockFileSystem.listdir',
new=mock_listdir([
'/data/2014/p/v/z/2014_/_03-_-21octor/20/ZOOO',
Expand Down Expand Up @@ -651,6 +680,35 @@ def output(self):
actual = [str(t) for t in task.requires()]
self.assertEqual(actual, expected)

def test_bulk_complete_of_params(self):
class BulkCompleteHourlyTask(luigi.Task):
non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False)
dh = luigi.DateHourParameter()
arbitrary_argument = luigi.BoolParameter()

@classmethod
def bulk_complete(cls, parameter_tuples):
for t in map(cls, parameter_tuples):
assert t.arbitrary_argument
return parameter_tuples[:-2]

def output(self):
raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete")

task = RangeHourly(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)),
of=BulkCompleteHourlyTask,
of_params=dict(arbitrary_argument=True),
start=datetime.datetime(2015, 11, 1),
stop=datetime.datetime(2015, 12, 1))

expected = [
'BulkCompleteHourlyTask(dh=2015-11-30T22, arbitrary_argument=True)',
'BulkCompleteHourlyTask(dh=2015-11-30T23, arbitrary_argument=True)',
]

actual = [str(t) for t in task.requires()]
self.assertEqual(actual, expected)

@mock.patch('luigi.mock.MockFileSystem.exists',
new=mock_exists_always_false)
def test_missing_directory(self):
Expand Down Expand Up @@ -738,3 +796,44 @@ def output(self):
param_name='date_param')
expected_task = MyTask('woo', datetime.date(2015, 12, 1))
self.assertEqual(expected_task, list(range_task._requires())[0])

def test_of_param_distinction(self):
class MyTask(luigi.Task):
arbitrary_param = luigi.Parameter(default='foo')
arbitrary_integer_param = luigi.IntParameter(default=10)
date_param = luigi.DateParameter()

def complete(self):
return False

range_task_1 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)),
of=MyTask,
start=datetime.date(2015, 12, 1),
stop=datetime.date(2015, 12, 2))
range_task_2 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)),
of=MyTask,
of_params=dict(arbitrary_param="bar", abitrary_integer_param=2),
start=datetime.date(2015, 12, 1),
stop=datetime.date(2015, 12, 2))
self.assertNotEqual(range_task_1.task_id, range_task_2.task_id)

def test_of_param_commandline(self):
class MyTask(luigi.Task):
task_namespace = "wohoo"
date_param = luigi.DateParameter()
arbitrary_param = luigi.Parameter(default='foo')
arbitrary_integer_param = luigi.IntParameter(default=10)
state = (None, None)
comp = False

def complete(self):
return self.comp

def run(self):
self.comp = True
MyTask.state = (self.arbitrary_param, self.arbitrary_integer_param)

now = str(int(datetime_to_epoch(datetime.datetime(2015, 12, 2))))
self.run_locally(['RangeDailyBase', '--of', 'wohoo.MyTask', '--of-params', '{"arbitrary_param":"bar","arbitrary_integer_param":5}',
'--now', '{0}'.format(now), '--start', '2015-12-01', '--stop', '2015-12-02'])
self.assertEqual(MyTask.state, ('bar', 5))