From d86b97ee4cec3906eafe37b202c79199c7192ba7 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 21 Oct 2023 14:16:25 +0200 Subject: [PATCH] worker: Log which outputs are missing when task is unexpectedly incomplete (#3258) * worker: Log which outputs are missing when task is unexpectedly incomplete --- luigi/contrib/dropbox.py | 3 +++ luigi/contrib/hive.py | 3 +++ luigi/contrib/mongodb.py | 3 +++ luigi/contrib/mssqldb.py | 3 +++ luigi/contrib/mysqldb.py | 3 +++ luigi/contrib/postgres.py | 3 +++ luigi/contrib/presto.py | 3 +++ luigi/contrib/redis_store.py | 3 +++ luigi/contrib/simulate.py | 3 +++ luigi/contrib/sqla.py | 3 +++ luigi/target.py | 3 +++ luigi/worker.py | 9 ++++++++- 12 files changed, 41 insertions(+), 1 deletion(-) diff --git a/luigi/contrib/dropbox.py b/luigi/contrib/dropbox.py index aaa77953b2..52a6b9ae16 100644 --- a/luigi/contrib/dropbox.py +++ b/luigi/contrib/dropbox.py @@ -298,6 +298,9 @@ def __init__(self, path, token, format=None, user_agent="Luigi"): self.client = DropboxClient(token, user_agent) self.format = format or luigi.format.get_default_format() + def __str__(self): + return self.path + @property def fs(self): return self.client diff --git a/luigi/contrib/hive.py b/luigi/contrib/hive.py index 70dc4cab3c..0af1025d27 100644 --- a/luigi/contrib/hive.py +++ b/luigi/contrib/hive.py @@ -485,6 +485,9 @@ def __init__(self, table, partition, database='default', fail_missing_table=True self.client = client or get_default_client() self.fail_missing_table = fail_missing_table + def __str__(self): + return self.path + def exists(self): """ returns `True` if the partition/table exists diff --git a/luigi/contrib/mongodb.py b/luigi/contrib/mongodb.py index 7fa44cca80..7e4a6069bf 100644 --- a/luigi/contrib/mongodb.py +++ b/luigi/contrib/mongodb.py @@ -35,6 +35,9 @@ def __init__(self, mongo_client, index, collection): self._index = index self._collection = collection + def __str__(self): + return f'{self._index}/{self._collection}' + def get_collection(self): """ Return targeted mongo collection to query on diff --git a/luigi/contrib/mssqldb.py b/luigi/contrib/mssqldb.py index 57c0570673..8a70aaa5b0 100644 --- a/luigi/contrib/mssqldb.py +++ b/luigi/contrib/mssqldb.py @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id): self.table = table self.update_id = update_id + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/mysqldb.py b/luigi/contrib/mysqldb.py index 45e928314c..804e94fdee 100644 --- a/luigi/contrib/mysqldb.py +++ b/luigi/contrib/mysqldb.py @@ -68,6 +68,9 @@ def __init__(self, host, database, user, password, table, update_id, **cnx_kwarg self.update_id = update_id self.cnx_kwargs = cnx_kwargs + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/postgres.py b/luigi/contrib/postgres.py index f1dfebf4b8..719b80a4d7 100644 --- a/luigi/contrib/postgres.py +++ b/luigi/contrib/postgres.py @@ -200,6 +200,9 @@ def __init__( self.table = table self.update_id = update_id + def __str__(self): + return self.table + def touch(self, connection=None): """ Mark this update as complete. diff --git a/luigi/contrib/presto.py b/luigi/contrib/presto.py index dac21ecbef..dd5cd027c6 100644 --- a/luigi/contrib/presto.py +++ b/luigi/contrib/presto.py @@ -131,6 +131,9 @@ def __init__(self, client, catalog, database, table, partition=None): self._client = client self._count = None + def __str__(self): + return self.table + @property def _count_query(self): partition = OrderedDict(self.partition or {1: 1}) diff --git a/luigi/contrib/redis_store.py b/luigi/contrib/redis_store.py index 0a1d3bc60e..5b128d2837 100644 --- a/luigi/contrib/redis_store.py +++ b/luigi/contrib/redis_store.py @@ -73,6 +73,9 @@ def __init__(self, host, port, db, update_id, password=None, socket_timeout=self.socket_timeout, ) + def __str__(self): + return self.marker_key() + def marker_key(self): """ Generate a key for the indicator hash. diff --git a/luigi/contrib/simulate.py b/luigi/contrib/simulate.py index 88ea90664c..240512c8f5 100644 --- a/luigi/contrib/simulate.py +++ b/luigi/contrib/simulate.py @@ -79,6 +79,9 @@ def __init__(self, task_obj): shutil.rmtree(path) logger.debug('Deleted temporary directory %s', path) + def __str__(self): + return self.task_id + def get_path(self): """ Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments diff --git a/luigi/contrib/sqla.py b/luigi/contrib/sqla.py index ad8a352df1..f292c21354 100644 --- a/luigi/contrib/sqla.py +++ b/luigi/contrib/sqla.py @@ -189,6 +189,9 @@ def __init__(self, connection_string, target_table, update_id, echo=False, conne self.connect_args = connect_args self.marker_table_bound = None + def __str__(self): + return self.target_table + @property def engine(self): """ diff --git a/luigi/target.py b/luigi/target.py index 8b333f5326..ff5183e3eb 100644 --- a/luigi/target.py +++ b/luigi/target.py @@ -215,6 +215,9 @@ def __init__(self, path): # cast to str to allow path to be objects like pathlib.PosixPath and py._path.local.LocalPath self.path = str(path) + def __str__(self): + return self.path + @property @abc.abstractmethod def fs(self): diff --git a/luigi/worker.py b/luigi/worker.py index 01a2772a68..c3ea777b8a 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -182,7 +182,14 @@ def run(self): # checking completeness of self.task so outputs of dependencies are # irrelevant. if self.check_unfulfilled_deps and not _is_external(self.task): - missing = [dep.task_id for dep in self.task.deps() if not self.check_complete(dep)] + missing = [] + for dep in self.task.deps(): + if not self.check_complete(dep): + nonexistent_outputs = [output for output in dep.output() if not output.exists()] + if nonexistent_outputs: + missing.append(f'{dep.task_id} ({", ".join(map(str, nonexistent_outputs))})') + else: + missing.append(dep.task_id) if missing: deps = 'dependency' if len(missing) == 1 else 'dependencies' raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))