Skip to content

Commit

Permalink
Make task callbacks in TaskProcess dynamic.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Mar 16, 2018
1 parent d530d47 commit dc2a74a
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ class TaskProcess(multiprocessing.Process):
Mainly for convenience since this is run in a separate process. """

# mapping of status_reporter methods to task callbacks that are added to the task
# before they actually run, and removed afterwards
forward_reporter_callbacks = {
"update_tracking_url": "set_tracking_url",
"update_status_message": "set_status_message",
"update_progress_percentage": "set_progress_percentage",
"decrease_running_resources": "decrease_running_resources",
}

def __init__(self, task, worker_id, result_queue, status_reporter,
use_multiprocessing=False, worker_timeout=0, check_unfulfilled_deps=True):
super(TaskProcess, self).__init__()
Expand All @@ -124,17 +133,15 @@ def __init__(self, task, worker_id, result_queue, status_reporter,
self.check_unfulfilled_deps = check_unfulfilled_deps

def _run_get_new_deps(self):
self.task.set_tracking_url = self.status_reporter.update_tracking_url
self.task.set_status_message = self.status_reporter.update_status_message
self.task.set_progress_percentage = self.status_reporter.update_progress_percentage
self.task.decrease_running_resources = self.status_reporter.decrease_running_resources
# set task callbacks before running
for reporter_attr, task_attr in six.iteritems(self.forward_reporter_callbacks):
setattr(self.task, task_attr, getattr(self.status_reporter, reporter_attr))

task_gen = self.task.run()

self.task.set_tracking_url = None
self.task.set_status_message = None
self.task.set_progress_percentage = None
self.task.decrease_running_resources = None
# reset task callbacks
for reporter_attr, task_attr in six.iteritems(self.forward_reporter_callbacks):
setattr(self.task, task_attr, None)

if not isinstance(task_gen, types.GeneratorType):
return None
Expand Down

1 comment on commit dc2a74a

@Tarrasch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. If you're tired on being stuck with #2346, you can submit this as a separate PR.

Please sign in to comment.