From 9aa1b9d4c4d3228e4a872572e66f81fa35fac3a5 Mon Sep 17 00:00:00 2001 From: Dan Nugent Date: Mon, 2 May 2016 17:02:52 -0400 Subject: [PATCH 1/6] Pass through parameters on Range tasks to of tasks Parameters to tasks wrapped by RangeBase and its subclasses can now be passed through the Range to the 'of' task. This both makes it easier to specify parameters to tasks running inside of ranges in Python code and allows for Luigi to differentiate between multiple Range instances running with the same 'of' parameter. closes #1627 --- luigi/tools/range.py | 6 ++++-- test/range_test.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index f4a65e7a88..e64c59cffa 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -88,6 +88,7 @@ class RangeBase(luigi.WrapperTask): # TODO lift the single parameter constraint by passing unknown parameters through WrapperTask? of = luigi.TaskParameter( description="task name to be completed. The task must take a single datetime parameter") + of_params = luigi.DictParameter(default=dict(),description="Arguments to be provided to the 'of' class when instantiating") # The common parameters 'start' and 'stop' have type (e.g. DateParameter, # DateHourParameter) dependent on the concrete subclass, cumbersome to # define here generically without dark magic. Refer to the overrides. @@ -175,9 +176,10 @@ def _format_range(self, datetimes): def _instantiate_task_cls(self, param): if self.param_name is None: - return self.of_cls(param) + return self.of_cls(param,**self.of_params) else: - kwargs = {self.param_name: param} + kwargs = self.of_params.copy() + kwargs[self.param_name] = param return self.of_cls(**kwargs) def requires(self): diff --git a/test/range_test.py b/test/range_test.py index 3c357d68ee..ec1a6d7ae7 100644 --- a/test/range_test.py +++ b/test/range_test.py @@ -738,3 +738,46 @@ def output(self): param_name='date_param') expected_task = MyTask('woo', datetime.date(2015, 12, 1)) self.assertEqual(expected_task, list(range_task._requires())[0]) + + def test_of_param_distinction(self): + class MyTask(luigi.Task): + arbitrary_param = luigi.Parameter(default='foo') + arbitrary_integer_param = luigi.IntParameter(default=10) + date_param = luigi.DateParameter() + + def output(self): + return MockTarget("{0}-{1}-{2}".format(date_param,arbitrary_param,arbitrary_integer_param)) + + range_task_1 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), + of=MyTask, + start=datetime.date(2015, 12, 1), + stop=datetime.date(2015, 12, 2)) + range_task_2 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), + of=MyTask, + of_params=dict(arbitrary_param="bar",abitrary_integer_param=2), + start=datetime.date(2015, 12, 1), + stop=datetime.date(2015, 12, 2)) + self.assertNotEqual(range_task_1.task_id,range_task_2.task_id) + + def test_of_param_commandline(self): + class MyTask(luigi.Task): + task_namespace = "wohoo" + date_param = luigi.DateParameter() + arbitrary_param = luigi.Parameter(default='foo') + arbitrary_integer_param = luigi.IntParameter(default=10) + state = (None,None) + comp = False + + def output(self): + return MockTarget("{0}-{1}-{2}".format(date_param,arbitrary_param,arbitrary_integer_param)) + + def complete(self): + return self.comp + + def run(self): + self.comp = True + MyTask.state = (self.arbitrary_param,self.arbitrary_integer_param) + + now = str(int(datetime_to_epoch(datetime.datetime(2015, 12, 2)))) + self.run_locally(['RangeDailyBase','--of','wohoo.MyTask','--of-params','{"arbitrary_param":"bar","arbitrary_integer_param":5}','--now','{0}'.format(now),'--start','2015-12-01','--stop','2015-12-02']) + self.assertEqual(MyTask.state,('bar',5)) From 8f143853bea5a63be1710a3a39242220ec30b6eb Mon Sep 17 00:00:00 2001 From: Dan Nugent Date: Mon, 2 May 2016 17:52:41 -0400 Subject: [PATCH 2/6] Fixed flake errors for range changes --- luigi/tools/range.py | 4 ++-- test/range_test.py | 32 +++++++++++++++----------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index e64c59cffa..942836d853 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -88,7 +88,7 @@ class RangeBase(luigi.WrapperTask): # TODO lift the single parameter constraint by passing unknown parameters through WrapperTask? of = luigi.TaskParameter( description="task name to be completed. The task must take a single datetime parameter") - of_params = luigi.DictParameter(default=dict(),description="Arguments to be provided to the 'of' class when instantiating") + of_params = luigi.DictParameter(default=dict(), description="Arguments to be provided to the 'of' class when instantiating") # The common parameters 'start' and 'stop' have type (e.g. DateParameter, # DateHourParameter) dependent on the concrete subclass, cumbersome to # define here generically without dark magic. Refer to the overrides. @@ -176,7 +176,7 @@ def _format_range(self, datetimes): def _instantiate_task_cls(self, param): if self.param_name is None: - return self.of_cls(param,**self.of_params) + return self.of_cls(param, **self.of_params) else: kwargs = self.of_params.copy() kwargs[self.param_name] = param diff --git a/test/range_test.py b/test/range_test.py index ec1a6d7ae7..f56076b264 100644 --- a/test/range_test.py +++ b/test/range_test.py @@ -745,19 +745,19 @@ class MyTask(luigi.Task): arbitrary_integer_param = luigi.IntParameter(default=10) date_param = luigi.DateParameter() - def output(self): - return MockTarget("{0}-{1}-{2}".format(date_param,arbitrary_param,arbitrary_integer_param)) + def complete(self): + return False range_task_1 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), - of=MyTask, - start=datetime.date(2015, 12, 1), - stop=datetime.date(2015, 12, 2)) + of=MyTask, + start=datetime.date(2015, 12, 1), + stop=datetime.date(2015, 12, 2)) range_task_2 = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 2)), - of=MyTask, - of_params=dict(arbitrary_param="bar",abitrary_integer_param=2), - start=datetime.date(2015, 12, 1), - stop=datetime.date(2015, 12, 2)) - self.assertNotEqual(range_task_1.task_id,range_task_2.task_id) + of=MyTask, + of_params=dict(arbitrary_param="bar", abitrary_integer_param=2), + start=datetime.date(2015, 12, 1), + stop=datetime.date(2015, 12, 2)) + self.assertNotEqual(range_task_1.task_id, range_task_2.task_id) def test_of_param_commandline(self): class MyTask(luigi.Task): @@ -765,19 +765,17 @@ class MyTask(luigi.Task): date_param = luigi.DateParameter() arbitrary_param = luigi.Parameter(default='foo') arbitrary_integer_param = luigi.IntParameter(default=10) - state = (None,None) + state = (None, None) comp = False - def output(self): - return MockTarget("{0}-{1}-{2}".format(date_param,arbitrary_param,arbitrary_integer_param)) - def complete(self): return self.comp def run(self): self.comp = True - MyTask.state = (self.arbitrary_param,self.arbitrary_integer_param) + MyTask.state = (self.arbitrary_param, self.arbitrary_integer_param) now = str(int(datetime_to_epoch(datetime.datetime(2015, 12, 2)))) - self.run_locally(['RangeDailyBase','--of','wohoo.MyTask','--of-params','{"arbitrary_param":"bar","arbitrary_integer_param":5}','--now','{0}'.format(now),'--start','2015-12-01','--stop','2015-12-02']) - self.assertEqual(MyTask.state,('bar',5)) + self.run_locally(['RangeDailyBase', '--of', 'wohoo.MyTask', '--of-params', '{"arbitrary_param":"bar","arbitrary_integer_param":5}', + '--now', '{0}'.format(now), '--start', '2015-12-01', '--stop', '2015-12-02']) + self.assertEqual(MyTask.state, ('bar', 5)) From ea097fc794f58a01a6bfc29c847508aea38ad167 Mon Sep 17 00:00:00 2001 From: Dan Nugent Date: Tue, 3 May 2016 18:53:48 -0400 Subject: [PATCH 3/6] Modified bulk_complete invocations to pass of_params In RangeDaily and RangeHourly, the bulk_complete parameter was invoked with a list of the date/hour parameters to be used for the of task. This prevents of_params from being provided to the bulk_complete code. Instead, we change the invocation of the bulk_complete method to use a partial with the of_params values curried. Thus, any code using the cls parameter of bulk_complete can behave as normal while still allowing the of_params to be passed through. For code that would rather subclass and override missing_datetimes, two new helper methods are provided: datetime_to_parameters and parameters_to_datetime. These are parallel to datetime_to_parameter and parameter_to_datetime and rather than handling only the first argument to the of class, handle all the required arguments. --- luigi/tools/range.py | 42 ++++++++++++++++++++++++++++++++++-------- test/range_test.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 8 deletions(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index 942836d853..7d2aa6c076 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -26,6 +26,7 @@ """ import itertools +import functools import logging import warnings import operator @@ -126,6 +127,12 @@ def datetime_to_parameter(self, dt): def parameter_to_datetime(self, p): raise NotImplementedError + def datetime_to_parameters(self, dt): + raise NotImplementedError + + def parameters_to_datetime(self, p): + raise NotImplementedError + def moving_start(self, now): """ Returns a datetime from which to ensure contiguousness in the case when @@ -175,12 +182,16 @@ def _format_range(self, datetimes): return '[%s, %s]' % (param_first, param_last) def _instantiate_task_cls(self, param): - if self.param_name is None: - return self.of_cls(param, **self.of_params) - else: - kwargs = self.of_params.copy() - kwargs[self.param_name] = param - return self.of_cls(**kwargs) + return self.of(**self._task_parameters(param)) + + @property + def _param_name(self): + return self.of.get_params()[0][0] if self.param_name is None else self.param_name + + def _task_parameters(self, param): + kwargs = dict(**self.of_params) + kwargs[self._param_name] = param + return kwargs def requires(self): # cache because we anticipate a fair amount of computation @@ -284,6 +295,13 @@ def datetime_to_parameter(self, dt): def parameter_to_datetime(self, p): return datetime(p.year, p.month, p.day) + def datetime_to_parameters(self, dt): + return self._task_parameters(dt.date()) + + def parameters_to_datetime(self, p): + dt = p[self._param_name] + return datetime(dt.year, dt.month, dt.day) + def moving_start(self, now): return now - timedelta(days=self.days_back) @@ -334,6 +352,12 @@ def datetime_to_parameter(self, dt): def parameter_to_datetime(self, p): return p + def datetime_to_parameters(self, dt): + return self._task_parameters(dt) + + def parameters_to_datetime(self, p): + return p[self._param_name] + def moving_start(self, now): return now - timedelta(hours=self.hours_back) @@ -531,7 +555,8 @@ class RangeDaily(RangeDailyBase): def missing_datetimes(self, finite_datetimes): try: - complete_parameters = self.of_cls.bulk_complete(map(self.datetime_to_parameter, finite_datetimes)) + cls_with_params = functools.partial(self.of, **self.of_params) + complete_parameters = self.of.bulk_complete.__func__(cls_with_params, map(self.datetime_to_parameter, finite_datetimes)) return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters)) except NotImplementedError: return infer_bulk_complete_from_fs( @@ -559,7 +584,8 @@ class RangeHourly(RangeHourlyBase): def missing_datetimes(self, finite_datetimes): try: # TODO: Why is there a list() here but not for the RangeDaily?? - complete_parameters = self.of_cls.bulk_complete(list(map(self.datetime_to_parameter, finite_datetimes))) + cls_with_params = functools.partial(self.of, **self.of_params) + complete_parameters = self.of.bulk_complete.__func__(cls_with_params, list(map(self.datetime_to_parameter, finite_datetimes))) return set(finite_datetimes) - set(map(self.parameter_to_datetime, complete_parameters)) except NotImplementedError: return infer_bulk_complete_from_fs( diff --git a/test/range_test.py b/test/range_test.py index f56076b264..149ea7dee4 100644 --- a/test/range_test.py +++ b/test/range_test.py @@ -566,6 +566,27 @@ def output(self): actual = [str(t) for t in task.requires()] self.assertEqual(actual, expected) + def test_bulk_complete_of_params(self): + class BulkCompleteDailyTask(luigi.Task): + d = luigi.DateParameter() + arbitrary_argument = luigi.BoolParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + for t in map(cls, parameter_tuples): + assert t.arbitrary_argument + return list(parameter_tuples) + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeDaily(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteDailyTask, + of_params=dict(arbitrary_argument=True), + start=datetime.date(2015, 11, 1), + stop=datetime.date(2015, 12, 1)) + task.requires() + @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir([ '/data/2014/p/v/z/2014_/_03-_-21octor/20/ZOOO', @@ -651,6 +672,28 @@ def output(self): actual = [str(t) for t in task.requires()] self.assertEqual(actual, expected) + def test_bulk_complete_of_params(self): + class BulkCompleteHourlyTask(luigi.Task): + dh = luigi.DateHourParameter() + arbitrary_argument = luigi.BoolParameter() + + @classmethod + def bulk_complete(cls, parameter_tuples): + for t in map(cls, parameter_tuples): + assert t.arbitrary_argument + return parameter_tuples + + def output(self): + raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") + + task = RangeHourly(now=datetime_to_epoch(datetime.datetime(2015, 12, 1)), + of=BulkCompleteHourlyTask, + of_params=dict(arbitrary_argument=True), + start=datetime.datetime(2015, 11, 1), + stop=datetime.datetime(2015, 12, 1)) + + task.requires() + @mock.patch('luigi.mock.MockFileSystem.exists', new=mock_exists_always_false) def test_missing_directory(self): From e9c4c88dd89cbdcaa4f821071c357a857ce26ca8 Mon Sep 17 00:00:00 2001 From: Dan Nugent Date: Tue, 3 May 2016 21:26:08 -0400 Subject: [PATCH 4/6] Ensure positional argument is used in Range task instantiation Changed the determination of the first positional argument to feed through on the Range tasks to actually check whether an argument was positional or not --- luigi/tools/range.py | 5 ++++- test/range_test.py | 25 ++++++++++++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/luigi/tools/range.py b/luigi/tools/range.py index 7d2aa6c076..e066ae5f14 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -186,7 +186,10 @@ def _instantiate_task_cls(self, param): @property def _param_name(self): - return self.of.get_params()[0][0] if self.param_name is None else self.param_name + if self.param_name is None: + return next(x[0] for x in self.of.get_params() if x[1].positional) + else: + return self.param_name def _task_parameters(self, param): kwargs = dict(**self.of_params) diff --git a/test/range_test.py b/test/range_test.py index 149ea7dee4..2a64af04da 100644 --- a/test/range_test.py +++ b/test/range_test.py @@ -568,14 +568,16 @@ def output(self): def test_bulk_complete_of_params(self): class BulkCompleteDailyTask(luigi.Task): + non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False) d = luigi.DateParameter() arbitrary_argument = luigi.BoolParameter() @classmethod def bulk_complete(cls, parameter_tuples): - for t in map(cls, parameter_tuples): + ptuples = list(parameter_tuples) + for t in map(cls, ptuples): assert t.arbitrary_argument - return list(parameter_tuples) + return ptuples[:-2] def output(self): raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") @@ -585,7 +587,13 @@ def output(self): of_params=dict(arbitrary_argument=True), start=datetime.date(2015, 11, 1), stop=datetime.date(2015, 12, 1)) - task.requires() + expected = [ + 'BulkCompleteDailyTask(d=2015-11-29, arbitrary_argument=True)', + 'BulkCompleteDailyTask(d=2015-11-30, arbitrary_argument=True)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) @mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir([ @@ -674,6 +682,7 @@ def output(self): def test_bulk_complete_of_params(self): class BulkCompleteHourlyTask(luigi.Task): + non_positional_arbitrary_argument = luigi.Parameter(default="whatever", positional=False, significant=False) dh = luigi.DateHourParameter() arbitrary_argument = luigi.BoolParameter() @@ -681,7 +690,7 @@ class BulkCompleteHourlyTask(luigi.Task): def bulk_complete(cls, parameter_tuples): for t in map(cls, parameter_tuples): assert t.arbitrary_argument - return parameter_tuples + return parameter_tuples[:-2] def output(self): raise RuntimeError("Shouldn't get called while resolving deps via bulk_complete") @@ -692,7 +701,13 @@ def output(self): start=datetime.datetime(2015, 11, 1), stop=datetime.datetime(2015, 12, 1)) - task.requires() + expected = [ + 'BulkCompleteHourlyTask(dh=2015-11-30T22, arbitrary_argument=True)', + 'BulkCompleteHourlyTask(dh=2015-11-30T23, arbitrary_argument=True)', + ] + + actual = [str(t) for t in task.requires()] + self.assertEqual(actual, expected) @mock.patch('luigi.mock.MockFileSystem.exists', new=mock_exists_always_false) From 8dc302cab4e9f2fd629c8fd0fbb3d608de78a6a2 Mon Sep 17 00:00:00 2001 From: Dan Nugent Date: Wed, 4 May 2016 10:29:27 -0400 Subject: [PATCH 5/6] Added documentation --- doc/luigi_patterns.rst | 15 +++++++++++---- luigi/tools/range.py | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/doc/luigi_patterns.rst b/doc/luigi_patterns.rst index 65470dc9b3..0180129288 100644 --- a/doc/luigi_patterns.rst +++ b/doc/luigi_patterns.rst @@ -102,10 +102,17 @@ and stop (exclusive) parameters specified: Propagating parameters with Range ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -When your recurring task has a parameter, you'll at first notice that the Range -tasks do not recognize or propagate parameters passed to them. The easiest -solution is to set the parameter at the task family level as described -:ref:`here `. +Some tasks you want to recur may include additional parameters which need to be configured. +The Range classes provide a parameter which accepts a :class:`~luigi.params.DictParameter` +and passes any parameters onwards for this purpose. + +.. code-block:: console + + luigi RangeDaily --of MyTask --start 2014-10-31 --of-param '{"my-param": 123}' + +Alternatively, you can specify parameters at the task family level (as described :ref:`here `), +however these will not appear in the task name for the upstream Range task which +can have implications in how the scheduler and visualizer handle task instances. .. code-block:: console diff --git a/luigi/tools/range.py b/luigi/tools/range.py index e066ae5f14..ba1c83cf2d 100644 --- a/luigi/tools/range.py +++ b/luigi/tools/range.py @@ -128,9 +128,15 @@ def parameter_to_datetime(self, p): raise NotImplementedError def datetime_to_parameters(self, dt): + """ + Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter + """ raise NotImplementedError def parameters_to_datetime(self, p): + """ + Given a dictionary of parameters, will extract the ranged task parameter value + """ raise NotImplementedError def moving_start(self, now): @@ -299,9 +305,15 @@ def parameter_to_datetime(self, p): return datetime(p.year, p.month, p.day) def datetime_to_parameters(self, dt): + """ + Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter + """ return self._task_parameters(dt.date()) def parameters_to_datetime(self, p): + """ + Given a dictionary of parameters, will extract the ranged task parameter value + """ dt = p[self._param_name] return datetime(dt.year, dt.month, dt.day) @@ -356,9 +368,15 @@ def parameter_to_datetime(self, p): return p def datetime_to_parameters(self, dt): + """ + Given a date-time, will produce a dictionary of of-params combined with the ranged task parameter + """ return self._task_parameters(dt) def parameters_to_datetime(self, p): + """ + Given a dictionary of parameters, will extract the ranged task parameter value + """ return p[self._param_name] def moving_start(self, now): From 1523ecf5a0ffad8b25b949345406ff8b97f1ed97 Mon Sep 17 00:00:00 2001 From: Daniel Nugent Date: Wed, 4 May 2016 10:37:35 -0400 Subject: [PATCH 6/6] Typo, doh --- doc/luigi_patterns.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/luigi_patterns.rst b/doc/luigi_patterns.rst index 0180129288..a63d3b4b64 100644 --- a/doc/luigi_patterns.rst +++ b/doc/luigi_patterns.rst @@ -103,7 +103,7 @@ Propagating parameters with Range ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Some tasks you want to recur may include additional parameters which need to be configured. -The Range classes provide a parameter which accepts a :class:`~luigi.params.DictParameter` +The Range classes provide a parameter which accepts a :class:`~luigi.parameter.DictParameter` and passes any parameters onwards for this purpose. .. code-block:: console