-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
don't consider task with unknown state dependencies as pending #13
Changes from 4 commits
b0d9646
5a9fee4
5c6bade
db2793b
f0c9824
e1f13d8
f385a06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ | |
import os | ||
import re | ||
import time | ||
import uuid | ||
|
||
from luigi import six | ||
|
||
|
@@ -148,6 +149,10 @@ class scheduler(Config): | |
|
||
prune_on_get_work = parameter.BoolParameter(default=False) | ||
|
||
pause_enabled = parameter.BoolParameter(default=True) | ||
|
||
send_messages = parameter.BoolParameter(default=True) | ||
|
||
def _get_retry_policy(self): | ||
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window) | ||
|
||
|
@@ -276,8 +281,8 @@ def __eq__(self, other): | |
|
||
class Task(object): | ||
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None, | ||
params=None, param_visibilities=None, tracking_url=None, status_message=None, | ||
progress_percentage=None, retry_policy='notoptional'): | ||
params=None, accepts_messages=False, param_visibilities=None, tracking_url=None, | ||
status_message=None, progress_percentage=None, retry_policy='notoptional'): | ||
self.id = task_id | ||
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active) | ||
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active | ||
|
@@ -302,11 +307,13 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='', | |
self.public_params = {} | ||
self.hidden_params = {} | ||
self.set_params(params) | ||
self.accepts_messages = accepts_messages | ||
self.retry_policy = retry_policy | ||
self.failures = Failures(self.retry_policy.disable_window) | ||
self.tracking_url = tracking_url | ||
self.status_message = status_message | ||
self.progress_percentage = progress_percentage | ||
self.scheduler_message_responses = {} | ||
self.scheduler_disable_time = None | ||
self.runnable = False | ||
self.batchable = False | ||
|
@@ -782,9 +789,9 @@ def forgive_failures(self, task_id=None): | |
@rpc_method() | ||
def add_task(self, task_id=None, status=PENDING, runnable=True, | ||
deps=None, new_deps=None, expl=None, resources=None, | ||
priority=0, family='', module=None, params=None, param_visibilities=None, | ||
assistant=False, tracking_url=None, worker=None, batchable=None, | ||
batch_id=None, retry_policy_dict=None, owners=None, **kwargs): | ||
priority=0, family='', module=None, params=None, accepts_messages=False, | ||
param_visibilities=None, assistant=False, tracking_url=None, worker=None, | ||
batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs): | ||
""" | ||
* add task identified by task_id if it doesn't exist | ||
* if deps is not None, update dependency list | ||
|
@@ -806,7 +813,8 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, | |
if worker.enabled: | ||
_default_task = self._make_task( | ||
task_id=task_id, status=PENDING, deps=deps, resources=resources, | ||
priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities | ||
priority=priority, family=family, module=module, params=params, | ||
accepts_messages=accepts_messages, param_visibilities=param_visibilities | ||
) | ||
else: | ||
_default_task = None | ||
|
@@ -821,7 +829,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, | |
task.family = family | ||
if not getattr(task, 'module', None): | ||
task.module = module | ||
if not task.param_visibilities: | ||
if not getattr(task, 'param_visibilities', None): | ||
task.param_visibilities = _get_default(param_visibilities, {}) | ||
if not task.params: | ||
task.set_params(params) | ||
|
@@ -944,17 +952,48 @@ def disable_worker(self, worker): | |
def set_worker_processes(self, worker, n): | ||
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n) | ||
|
||
@rpc_method() | ||
def send_scheduler_message(self, worker, task, content): | ||
if not self._config.send_messages: | ||
return {"message_id": None} | ||
|
||
message_id = str(uuid.uuid4()) | ||
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task, | ||
message_id=message_id, content=content) | ||
|
||
return {"message_id": message_id} | ||
|
||
@rpc_method() | ||
def add_scheduler_message_response(self, task_id, message_id, response): | ||
if self._state.has_task(task_id): | ||
task = self._state.get_task(task_id) | ||
task.scheduler_message_responses[message_id] = response | ||
|
||
@rpc_method() | ||
def get_scheduler_message_response(self, task_id, message_id): | ||
response = None | ||
if self._state.has_task(task_id): | ||
task = self._state.get_task(task_id) | ||
response = task.scheduler_message_responses.pop(message_id, None) | ||
return {"response": response} | ||
|
||
@rpc_method() | ||
def is_pause_enabled(self): | ||
return {'enabled': self._config.pause_enabled} | ||
|
||
@rpc_method() | ||
def is_paused(self): | ||
return {'paused': self._paused} | ||
|
||
@rpc_method() | ||
def pause(self): | ||
self._paused = True | ||
if self._config.pause_enabled: | ||
self._paused = True | ||
|
||
@rpc_method() | ||
def unpause(self): | ||
self._paused = False | ||
if self._config.pause_enabled: | ||
self._paused = False | ||
|
||
@rpc_method() | ||
def update_resources(self, **resources): | ||
|
@@ -1047,9 +1086,19 @@ def count_pending(self, worker): | |
for task in worker.get_tasks(self._state, PENDING, FAILED): | ||
if self._upstream_status(task.id, upstream_status_table) == UPSTREAM_DISABLED: | ||
continue | ||
num_pending += 1 | ||
num_unique_pending += int(len(task.workers) == 1) | ||
num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id) | ||
has_failed_dependency = False | ||
for dep in task.deps: | ||
dep_task = self._state.get_task(dep, default=None) | ||
if dep_task.status == UNKNOWN: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whats an example where this happens? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you plan to recover from this state? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @srikiraju there is no recovery from this. This causes the worker to see that there are no pending tasks left and actually exit, otherwise it keeps waiting forever for a task that is NOT retried because only tasks in FAILED state are retried :-/ |
||
# consider this task as not pending since these dependencies have broken | ||
# requires. this means that they won't ever be retried and can't succeed at all | ||
has_failed_dependency = True | ||
break | ||
|
||
if not has_failed_dependency: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of not updating any of the tasks, perhaps we take all these values and do something about the 'failed dependency'? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you have in mind when you say "do something"? as in report it or retry it? if you mean retry, by the time it gets to this point, the dependency failed in the |
||
num_pending += 1 | ||
num_unique_pending += int(len(task.workers) == 1) | ||
num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id) | ||
|
||
return { | ||
'n_pending_tasks': num_pending, | ||
|
@@ -1265,6 +1314,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None): | |
ret['re_enable_able'] = task.scheduler_disable_time is not None | ||
if include_deps: | ||
ret['deps'] = list(task.deps if deps is None else deps) | ||
if self._config.send_messages and task.status == RUNNING: | ||
ret['accepts_messages'] = task.accepts_messages | ||
return ret | ||
|
||
@rpc_method() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this for??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these necessary changes?? they seem unrelated and unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this fixed an error i was getting when i tried to test it on an ODIN. i don't know how this wasn't a problem in production since
configparser
is only available in python3. sinceInterpolation
isn't inConfigParser
, it gets set toobject
. I pulled this code snippet from the upstream Luigi repo