Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't consider task with unknown state dependencies as pending #13

Merged
merged 7 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion luigi/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
import os
import warnings

from configparser import Interpolation
try:
from ConfigParser import ConfigParser, NoOptionError, NoSectionError
Interpolation = object

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for??

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these necessary changes?? they seem unrelated and unused.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixed an error i was getting when i tried to test it on an ODIN. i don't know how this wasn't a problem in production since configparser is only available in python3. since Interpolation isn't in ConfigParser, it gets set to object. I pulled this code snippet from the upstream Luigi repo

except ImportError:
from configparser import ConfigParser, NoOptionError, NoSectionError
from configparser import Interpolation


class LuigiConfigParser(ConfigParser):
Expand Down
75 changes: 63 additions & 12 deletions luigi/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import os
import re
import time
import uuid

from luigi import six

Expand Down Expand Up @@ -148,6 +149,10 @@ class scheduler(Config):

prune_on_get_work = parameter.BoolParameter(default=False)

pause_enabled = parameter.BoolParameter(default=True)

send_messages = parameter.BoolParameter(default=True)

def _get_retry_policy(self):
return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window)

Expand Down Expand Up @@ -276,8 +281,8 @@ def __eq__(self, other):

class Task(object):
def __init__(self, task_id, status, deps, resources=None, priority=0, family='', module=None,
params=None, param_visibilities=None, tracking_url=None, status_message=None,
progress_percentage=None, retry_policy='notoptional'):
params=None, accepts_messages=False, param_visibilities=None, tracking_url=None,
status_message=None, progress_percentage=None, retry_policy='notoptional'):
self.id = task_id
self.stakeholders = set() # workers ids that are somehow related to this task (i.e. don't prune while any of these workers are still active)
self.workers = OrderedSet() # workers ids that can perform task - task is 'BROKEN' if none of these workers are active
Expand All @@ -302,11 +307,13 @@ def __init__(self, task_id, status, deps, resources=None, priority=0, family='',
self.public_params = {}
self.hidden_params = {}
self.set_params(params)
self.accepts_messages = accepts_messages
self.retry_policy = retry_policy
self.failures = Failures(self.retry_policy.disable_window)
self.tracking_url = tracking_url
self.status_message = status_message
self.progress_percentage = progress_percentage
self.scheduler_message_responses = {}
self.scheduler_disable_time = None
self.runnable = False
self.batchable = False
Expand Down Expand Up @@ -782,9 +789,9 @@ def forgive_failures(self, task_id=None):
@rpc_method()
def add_task(self, task_id=None, status=PENDING, runnable=True,
deps=None, new_deps=None, expl=None, resources=None,
priority=0, family='', module=None, params=None, param_visibilities=None,
assistant=False, tracking_url=None, worker=None, batchable=None,
batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
priority=0, family='', module=None, params=None, accepts_messages=False,
param_visibilities=None, assistant=False, tracking_url=None, worker=None,
batchable=None, batch_id=None, retry_policy_dict=None, owners=None, **kwargs):
"""
* add task identified by task_id if it doesn't exist
* if deps is not None, update dependency list
Expand All @@ -806,7 +813,8 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
if worker.enabled:
_default_task = self._make_task(
task_id=task_id, status=PENDING, deps=deps, resources=resources,
priority=priority, family=family, module=module, params=params, param_visibilities=param_visibilities
priority=priority, family=family, module=module, params=params,
accepts_messages=accepts_messages, param_visibilities=param_visibilities
)
else:
_default_task = None
Expand All @@ -821,7 +829,7 @@ def add_task(self, task_id=None, status=PENDING, runnable=True,
task.family = family
if not getattr(task, 'module', None):
task.module = module
if not task.param_visibilities:
if not getattr(task, 'param_visibilities', None):
task.param_visibilities = _get_default(param_visibilities, {})
if not task.params:
task.set_params(params)
Expand Down Expand Up @@ -944,17 +952,48 @@ def disable_worker(self, worker):
def set_worker_processes(self, worker, n):
self._state.get_worker(worker).add_rpc_message('set_worker_processes', n=n)

@rpc_method()
def send_scheduler_message(self, worker, task, content):
if not self._config.send_messages:
return {"message_id": None}

message_id = str(uuid.uuid4())
self._state.get_worker(worker).add_rpc_message('dispatch_scheduler_message', task_id=task,
message_id=message_id, content=content)

return {"message_id": message_id}

@rpc_method()
def add_scheduler_message_response(self, task_id, message_id, response):
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
task.scheduler_message_responses[message_id] = response

@rpc_method()
def get_scheduler_message_response(self, task_id, message_id):
response = None
if self._state.has_task(task_id):
task = self._state.get_task(task_id)
response = task.scheduler_message_responses.pop(message_id, None)
return {"response": response}

@rpc_method()
def is_pause_enabled(self):
return {'enabled': self._config.pause_enabled}

@rpc_method()
def is_paused(self):
return {'paused': self._paused}

@rpc_method()
def pause(self):
self._paused = True
if self._config.pause_enabled:
self._paused = True

@rpc_method()
def unpause(self):
self._paused = False
if self._config.pause_enabled:
self._paused = False

@rpc_method()
def update_resources(self, **resources):
Expand Down Expand Up @@ -1047,9 +1086,19 @@ def count_pending(self, worker):
for task in worker.get_tasks(self._state, PENDING, FAILED):
if self._upstream_status(task.id, upstream_status_table) == UPSTREAM_DISABLED:
continue
num_pending += 1
num_unique_pending += int(len(task.workers) == 1)
num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id)
has_failed_dependency = False
for dep in task.deps:
dep_task = self._state.get_task(dep, default=None)
if dep_task.status == UNKNOWN:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats an example where this happens?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you plan to recover from this state?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srikiraju there is no recovery from this. This causes the worker to see that there are no pending tasks left and actually exit, otherwise it keeps waiting forever for a task that is NOT retried because only tasks in FAILED state are retried :-/

# consider this task as not pending since these dependencies have broken
# requires. this means that they won't ever be retried and can't succeed at all
has_failed_dependency = True
break

if not has_failed_dependency:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of not updating any of the tasks, perhaps we take all these values and do something about the 'failed dependency'?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you have in mind when you say "do something"? as in report it or retry it? if you mean retry, by the time it gets to this point, the dependency failed in the requires section, which afaik is only run during add so it can't be retried.

num_pending += 1
num_unique_pending += int(len(task.workers) == 1)
num_pending_last_scheduled += int(task.workers.peek(last=True) == worker_id)

return {
'n_pending_tasks': num_pending,
Expand Down Expand Up @@ -1265,6 +1314,8 @@ def _serialize_task(self, task_id, include_deps=True, deps=None):
ret['re_enable_able'] = task.scheduler_disable_time is not None
if include_deps:
ret['deps'] = list(task.deps if deps is None else deps)
if self._config.send_messages and task.status == RUNNING:
ret['accepts_messages'] = task.accepts_messages
return ret

@rpc_method()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_static_files(path):

setup(
name='luigi',
version='2.7.5.affirm.1.2.0',
version='2.7.5.affirm.1.3.0',
description='Workflow mgmgt + task scheduling + dependency resolution',
long_description=long_description,
author='The Luigi Authors',
Expand Down