Skip to content

Commit

Permalink
Merge pull request spotify#1449 from compete/topics/external_tasks_w_…
Browse files Browse the repository at this point in the history
…deps

Fix issue with complete external tasks having missing deps
  • Loading branch information
erikbern committed Jan 15, 2016
2 parents 8c64217 + fa252fe commit f245797
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 28 deletions.
21 changes: 15 additions & 6 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ def run(self):
missing = []
new_deps = []
try:
# Verify that all the tasks are fulfilled!
missing = [dep.task_id for dep in self.task.deps() if not dep.complete()]
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
# Verify that all the tasks are fulfilled! For external tasks we
# don't care about unfulfilled dependencies, because we are just
# checking completeness of self.task so outputs of dependencies are
# irrelevant.
if self.task.run != NotImplemented:
missing = [dep.task_id for dep in self.task.deps() if not dep.complete()]
if missing:
deps = 'dependency' if len(missing) == 1 else 'dependencies'
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
self.task.trigger_event(Event.START, self.task)
t0 = time.time()
status = None
Expand All @@ -157,7 +161,12 @@ def run(self):
# External task
# TODO(erikbern): We should check for task completeness after non-external tasks too!
# This will resolve #814 and make things a lot more consistent
status = DONE if self.task.complete() else FAILED
if self.task.complete():
status = DONE
else:
status = FAILED
expl = 'Task is an external data dependency ' \
'and data does not exist (yet?).'
else:
new_deps = self._run_get_new_deps()
status = DONE if not new_deps else PENDING
Expand Down
62 changes: 40 additions & 22 deletions test/worker_external_task_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from luigi.scheduler import CentralPlannerScheduler
import luigi.server
import luigi.worker
import luigi.task
from mock import patch
from helpers import with_config, unittest
import os
import tempfile
import shutil


class TestExternalFileTask(luigi.ExternalTask):
Expand Down Expand Up @@ -73,34 +75,37 @@ def run(self):


class WorkerExternalTaskTest(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp(prefix='luigi-test-')

def tearDown(self):
shutil.rmtree(self.tempdir)

def _assert_complete(self, tasks):
for t in tasks:
self.assert_(t.complete())

def _build(self, tasks):
self.scheduler = CentralPlannerScheduler(prune_on_get_work=True)
with luigi.worker.Worker(scheduler=self.scheduler, worker_processes=1) as w:
with self._make_worker() as w:
for t in tasks:
w.add(t)
w.run()

def _make_worker(self):
self.scheduler = CentralPlannerScheduler(prune_on_get_work=True)
return luigi.worker.Worker(scheduler=self.scheduler, worker_processes=1)

def test_external_dependency_already_complete(self):
"""
Test that the test task completes when its dependency exists at the
start of the execution.
"""
tempdir = tempfile.mkdtemp(prefix='luigi-test-')
test_task = TestTask(tempdir=tempdir, complete_after=1)
test_task = TestTask(tempdir=self.tempdir, complete_after=1)
luigi.build([test_task], local_scheduler=True)

assert os.path.exists(test_task.dep_path)
assert os.path.exists(test_task.output_path)

os.unlink(test_task.dep_path)
os.unlink(test_task.output_path)
os.rmdir(tempdir)

# complete() is called once per failure, twice per success
assert test_task.dependency.times_called == 2

Expand All @@ -112,18 +117,12 @@ def test_external_dependency_gets_rechecked(self):
"""
assert luigi.worker.worker().retry_external_tasks is True

tempdir = tempfile.mkdtemp(prefix='luigi-test-')

test_task = TestTask(tempdir=tempdir, complete_after=10)
test_task = TestTask(tempdir=self.tempdir, complete_after=10)
self._build([test_task])

assert os.path.exists(test_task.dep_path)
assert os.path.exists(test_task.output_path)

os.unlink(test_task.dep_path)
os.unlink(test_task.output_path)
os.rmdir(tempdir)

self.assertGreaterEqual(test_task.dependency.times_called, 10)

@with_config({'worker': {'retry_external_tasks': 'true',
Expand All @@ -139,17 +138,36 @@ def test_external_dependency_worker_is_patient(self):
"""
assert luigi.worker.worker().retry_external_tasks is True

tempdir = tempfile.mkdtemp(prefix='luigi-test-')

with patch('random.uniform', return_value=0.001):
test_task = TestTask(tempdir=tempdir, complete_after=5)
test_task = TestTask(tempdir=self.tempdir, complete_after=5)
self._build([test_task])

assert os.path.exists(test_task.dep_path)
assert os.path.exists(test_task.output_path)

os.unlink(test_task.dep_path)
os.unlink(test_task.output_path)
os.rmdir(tempdir)

self.assertGreaterEqual(test_task.dependency.times_called, 5)

@with_config({'worker': {'retry_external_tasks': 'true', },
'scheduler': {'retry_delay': '0.0'}})
def test_external_task_complete_but_missing_dep_at_runtime(self):
"""
Test external task complete but has missing upstream dependency at
runtime.
Should not get "unfulfilled dependencies" error.
"""
test_task = TestTask(tempdir=self.tempdir, complete_after=3)
test_task.run = NotImplemented

assert len(test_task.deps()) > 0

# split up scheduling task and running to simulate runtime scenario
with self._make_worker() as w:
w.add(test_task)
# touch output so test_task should be considered complete at runtime
open(test_task.output_path, 'a').close()
success = w.run()

self.assertTrue(success)
# upstream dependency output didn't exist at runtime
self.assertFalse(os.path.exists(test_task.dep_path))

0 comments on commit f245797

Please sign in to comment.