Skip to content

Commit

Permalink
Merge branch 'upstream-master' into feature/refactor-tests
Browse files Browse the repository at this point in the history
* upstream-master:
  Make Worker parameter task_process_context an OptionalParameter (spotify#2468) (spotify#2574)
  Version 2.8.0
  Implement configurable CORS.
  Add HdfsFlagTarget (spotify#2559)
  Fix HdfsAtomicWriteDirPipe.close() when using snakebite and the file do not exist. (spotify#2549)
  Small fix to logging in contrib/ecs.py (spotify#2556)
  [ImgBot] optimizes images (spotify#2555)
  Add CopyToTable task for MySQL (spotify#2553)
  Make capture_output non-positional in ExternalProgramTask (spotify#2547)
  Add Movio to list of Luigi users (spotify#2551)
  Interpolate environment variables in .cfg config files (spotify#2527)
  Fix ReadTheDocs build (spotify#2546)
  • Loading branch information
dlstadther committed Nov 19, 2018
2 parents aed07c8 + 7d5aa91 commit cf335c7
Show file tree
Hide file tree
Showing 45 changed files with 824 additions and 55 deletions.
3 changes: 3 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ Some more companies are using Luigi but haven't had a chance yet to write about
* `Okko <https://okko.tv/>`_
* `ISVWorld <http://isvworld.com/>`_
* `Big Data <https://bigdata.com.br/>`_
* `Movio <https://movio.co.nz/>`_
* `Bonnier News <https://www.bonniernews.se/>`_
* `Starsky Robotics <https://www.starsky.io/>`_

We're more than happy to have your company added here. Just send a PR on GitHub.

Expand Down
Binary file modified doc/aggregate_artists.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
52 changes: 50 additions & 2 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ Configuration
All configuration can be done by adding configuration files.

Supported config parsers:
* ``cfg`` (default)

* ``cfg`` (default), based on Python's standard ConfigParser_. Values may refer to environment variables using ``${ENVVAR}`` syntax.
* ``toml``

.. _ConfigParser: https://docs.python.org/3/library/configparser.html

You can choose right parser via ``LUIGI_CONFIG_PARSER`` environment variable. For example, ``LUIGI_CONFIG_PARSER=toml``.

Default (cfg) parser are looked for in:
Expand Down Expand Up @@ -202,6 +205,51 @@ rpc-retry-wait
Defaults to 30


[cors]
------

.. versionadded:: 2.8.0

These parameters control ``/api/<method>`` ``CORS`` behaviour (see: `W3C Cross-Origin Resource Sharing
<http://www.w3.org/TR/cors/>`_).

enabled
Enables CORS support.
Defaults to false.

allowed_origins
A list of allowed origins. Used only if ``allow_any_origin`` is false.
Configure in JSON array format, e.g. ["foo", "bar"].
Defaults to empty.

allow_any_origin
Accepts requests from any origin.
Defaults to false.

allow_null_origin
Allows the request to set ``null`` value of the ``Origin`` header.
Defaults to false.

max_age
Content of ``Access-Control-Max-Age``.
Defaults to 86400 (24 hours).

allowed_methods
Content of ``Access-Control-Allow-Methods``.
Defaults to ``GET, OPTIONS``.

allowed_headers
Content of ``Access-Control-Allow-Headers``.
Defaults to ``Accept, Content-Type, Origin``.

exposed_headers
Content of ``Access-Control-Expose-Headers``.
Defaults to empty string (will NOT be sent as a response header).

allow_credentials
Indicates that the actual request can include user credentials.
Defaults to false.

.. _worker-config:

[worker]
Expand Down Expand Up @@ -607,7 +655,7 @@ is good practice to do so when you have a fixed set of resources.
.. _retcode-config:

[retcode]
----------
---------

Configure return codes for the Luigi binary. In the case of multiple return
codes that could apply, for example a failing task and missing data, the
Expand Down
Binary file modified doc/dependency_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/execution_model.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/history.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/history_by_id.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/history_by_name.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/luigi.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/parameters_date_algebra.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/parameters_enum.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/parameters_recursion.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/task_breakdown.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/task_parameters.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/task_with_targets.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/tasks_input_output_requires.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/tasks_with_dependencies.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/user_recs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/visualiser_front_page.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/web_server.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions examples/spark_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UserItemMatrix(luigi.Task):
def run(self):
"""
Generates :py:attr:`~.UserItemMatrix.data_size` elements.
Writes this data in \ separated value format into the target :py:func:`~/.UserItemMatrix.output`.
Writes this data in \\ separated value format into the target :py:func:`~/.UserItemMatrix.output`.
The data has the following elements:
Expand All @@ -43,7 +43,7 @@ def run(self):
w = self.output().open('w')
for user in range(self.data_size):
track = int(random.random() * self.data_size)
w.write('%d\%d\%f' % (user, track, 1.0))
w.write('%d\\%d\\%f' % (user, track, 1.0))
w.close()

def output(self):
Expand Down
93 changes: 91 additions & 2 deletions luigi/configuration/cfg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,94 @@
"""

import os
import re
import warnings

try:
from ConfigParser import ConfigParser, NoOptionError, NoSectionError
from ConfigParser import ConfigParser, NoOptionError, NoSectionError, InterpolationError
Interpolation = object
except ImportError:
from configparser import ConfigParser, NoOptionError, NoSectionError
from configparser import ConfigParser, NoOptionError, NoSectionError, InterpolationError
from configparser import Interpolation, BasicInterpolation

from .base_parser import BaseParser


class InterpolationMissingEnvvarError(InterpolationError):
"""
Raised when option value refers to a nonexisting environment variable.
"""

def __init__(self, option, section, value, envvar):
msg = (
"Config refers to a nonexisting environment variable {}. "
"Section [{}], option {}={}"
).format(envvar, section, option, value)
InterpolationError.__init__(self, option, section, msg)


class EnvironmentInterpolation(Interpolation):
"""
Custom interpolation which allows values to refer to environment variables
using the ``${ENVVAR}`` syntax.
"""
_ENVRE = re.compile(r"\$\{([^}]+)\}") # matches "${envvar}"

def before_get(self, parser, section, option, value, defaults):
return self._interpolate_env(option, section, value)

def _interpolate_env(self, option, section, value):
rawval = value
parts = []
while value:
match = self._ENVRE.search(value)
if match is None:
parts.append(value)
break
envvar = match.groups()[0]
try:
envval = os.environ[envvar]
except KeyError:
raise InterpolationMissingEnvvarError(
option, section, rawval, envvar)
start, end = match.span()
parts.append(value[:start])
parts.append(envval)
value = value[end:]
return "".join(parts)


class CombinedInterpolation(Interpolation):
"""
Custom interpolation which applies multiple interpolations in series.
:param interpolations: a sequence of configparser.Interpolation objects.
"""

def __init__(self, interpolations):
self._interpolations = interpolations

def before_get(self, parser, section, option, value, defaults):
for interp in self._interpolations:
value = interp.before_get(parser, section, option, value, defaults)
return value

def before_read(self, parser, section, option, value):
for interp in self._interpolations:
value = interp.before_read(parser, section, option, value)
return value

def before_set(self, parser, section, option, value):
for interp in self._interpolations:
value = interp.before_set(parser, section, option, value)
return value

def before_write(self, parser, section, option, value):
for interp in self._interpolations:
value = interp.before_write(parser, section, option, value)
return value


class LuigiConfigParser(BaseParser, ConfigParser):
NO_DEFAULT = object()
enabled = True
Expand All @@ -50,6 +128,17 @@ class LuigiConfigParser(BaseParser, ConfigParser):
'client.cfg', # Deprecated old-style local luigi config
'luigi.cfg',
]
if hasattr(ConfigParser, "_interpolate"):
# Override ConfigParser._interpolate (Python 2)
def _interpolate(self, section, option, rawval, vars):
value = ConfigParser._interpolate(self, section, option, rawval, vars)
return EnvironmentInterpolation().before_get(
parser=self, section=section, option=option,
value=value, defaults=None,
)
else:
# Override ConfigParser._DEFAULT_INTERPOLATION (Python 3)
_DEFAULT_INTERPOLATION = CombinedInterpolation([BasicInterpolation(), EnvironmentInterpolation()])

@classmethod
def reload(cls):
Expand Down
5 changes: 5 additions & 0 deletions luigi/contrib/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def run(self):
response = client.run_task(taskDefinition=self.task_def_arn,
overrides=overrides,
cluster=self.cluster)

if response['failures']:
raise Exception(", ".join(["fail to run task {0} reason: {1}".format(failure['arn'], failure['reason'])
for failure in response['failures']]))

self._task_ids = [task['taskArn'] for task in response['tasks']]

# Wait on task completion
Expand Down
2 changes: 1 addition & 1 deletion luigi/contrib/external_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ExternalProgramTask(luigi.Task):
behaviour can be overriden by passing ``--capture-output False``
"""

capture_output = luigi.BoolParameter(default=True, significant=False)
capture_output = luigi.BoolParameter(default=True, significant=False, positional=False)

def program_args(self):
"""
Expand Down
15 changes: 10 additions & 5 deletions luigi/contrib/hdfs/format.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import luigi.format
import logging
import os

import luigi.format
from luigi.contrib.hdfs.config import load_hadoop_cmd
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs.clients import remove, rename, mkdir, listdir
from luigi.contrib.hdfs.clients import remove, rename, mkdir, listdir, exists
from luigi.contrib.hdfs.error import HDFSCliError

logger = logging.getLogger('luigi-interface')
Expand Down Expand Up @@ -75,9 +76,13 @@ def abort(self):
def close(self):
super(HdfsAtomicWriteDirPipe, self).close()
try:
remove(self.path)
except HDFSCliError:
pass
if exists(self.path):
remove(self.path)
except Exception as ex:
if isinstance(ex, HDFSCliError) or ex.args[0].contains("FileNotFoundException"):
pass
else:
raise ex

# it's unlikely to fail in this way but better safe than sorry
if not all(result['result'] for result in rename(self.tmppath, self.path) or []):
Expand Down
6 changes: 3 additions & 3 deletions luigi/contrib/hdfs/snakebite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def remove(self, path, recursive=True, skip_trash=False):
:param path: delete-able file(s) or directory(ies)
:type path: either a string or a sequence of strings
:param recursive: delete directories trees like \*nix: rm -r
:param recursive: delete directories trees like \\*nix: rm -r
:type recursive: boolean, default is True
:param skip_trash: do or don't move deleted items into the trash first
:type skip_trash: boolean, default is False (use trash)
Expand All @@ -145,7 +145,7 @@ def chmod(self, path, permissions, recursive=False):
:param path: update-able file(s)
:type path: either a string or sequence of strings
:param permissions: \*nix style permission number
:param permissions: \\*nix style permission number
:type permissions: octal
:param recursive: change just listed entry(ies) or all in directories
:type recursive: boolean, default is False
Expand Down Expand Up @@ -242,7 +242,7 @@ def mkdir(self, path, parents=True, mode=0o755, raise_if_exists=False):
:type path: string
:param parents: create any missing parent directories
:type parents: boolean, default is True
:param mode: \*nix style owner/group/other permissions
:param mode: \\*nix style owner/group/other permissions
:type mode: octal, default 0755
"""
result = list(self.get_bite().mkdir(self.list_path(path),
Expand Down
35 changes: 35 additions & 0 deletions luigi/contrib/hdfs/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,38 @@ def _is_writable(self, path):
return True
except hdfs_clients.HDFSCliError:
return False


class HdfsFlagTarget(HdfsTarget):
"""
Defines a target directory with a flag-file (defaults to `_SUCCESS`) used
to signify job success.
This checks for two things:
* the path exists (just like the HdfsTarget)
* the _SUCCESS file exists within the directory.
Because Hadoop outputs into a directory and not a single file,
the path is assumed to be a directory.
"""
def __init__(self, path, format=None, client=None, flag='_SUCCESS'):
"""
Initializes a HdfsFlagTarget.
:param path: the directory where the files are stored.
:type path: str
:param client:
:type client:
:param flag:
:type flag: str
"""
if path[-1] != "/":
raise ValueError("HdfsFlagTarget requires the path to be to a "
"directory. It must end with a slash ( / ).")
super(HdfsFlagTarget, self).__init__(path, format, client)
self.flag = flag

def exists(self):
hadoopSemaphore = self.path + self.flag
return self.fs.exists(hadoopSemaphore)
Loading

0 comments on commit cf335c7

Please sign in to comment.