Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for the Docker Python SDK #2158

Merged
merged 50 commits into from
Oct 15, 2017
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8a4d331
added stub for minimal docker wrapper
Feb 26, 2017
8481b8a
don't set a name by default
Mar 1, 2017
83e6b48
handle None name
Mar 1, 2017
3b67754
changed to use the docker low level api
Mar 9, 2017
3cd4d11
removed debugging print
Mar 9, 2017
849e59c
refractored names
Mar 9, 2017
55f0528
added a test for single file mount
elipapa Mar 9, 2017
bb8122c
merged with tempdir support and image download
Mar 9, 2017
f1fb534
directory and file mounting working
Mar 9, 2017
ac55d9e
fixed tests and using busybox when needed
Mar 9, 2017
9321e4a
always use busybox
Mar 10, 2017
07fcb84
fallback if exception has no message
Mar 14, 2017
09f254e
lowered pull image to debug level
Mar 14, 2017
5258346
do not declare volumes unless they are passed as extra params
Mar 15, 2017
0f1547c
remove name requirement from tmpdir
elipapa Apr 25, 2017
a4643a1
linting
elipapa Apr 28, 2017
65b1a46
fix tests for tox
elipapa Apr 28, 2017
e0a4f28
tag must be specified to get the right number of images back from the…
elipapa Apr 28, 2017
a7da033
test to trigger duplicate mount pt issue
elipapa Apr 28, 2017
7273d3d
fix pid lock in tests
elipapa Apr 28, 2017
5750452
use a wrapper task in test
elipapa Apr 28, 2017
8c8580f
another test to see if the issue is in properties
elipapa May 1, 2017
6ce5586
created an _init_ method to avoid the mutable volumes list to be shar…
elipapa May 1, 2017
8452bf4
need args on the init to allow parameters
elipapa May 2, 2017
89ce5c0
solves pid lock
elipapa May 2, 2017
a625c32
linting
elipapa May 2, 2017
caedbcd
remove bad logging lines
elipapa May 2, 2017
fa3127c
fix handling of multiple volumes
elipapa May 17, 2017
f7c4197
must be passing kwargs when using super!
elipapa May 23, 2017
caf0070
old gitignore
elipapa Jun 15, 2017
9cf4e28
add elipapa as additional author
elipapa Jun 15, 2017
a1f0f29
Merge remote-tracking branch 'upstream/master'
elipapa Jun 15, 2017
d939dce
add a blog post about open targets use and this PR contribution
elipapa Jun 15, 2017
94f7890
skip tests if no docker daemon is present
elipapa Jun 16, 2017
0ae8df1
literal should become a bytes literal in Py3
elipapa Jun 16, 2017
32b1a88
flake8 linting
elipapa Jun 22, 2017
93608c2
do not mount tmp dir by default and remove unused import
elipapa Jun 30, 2017
ca21d2d
linting
elipapa Jun 30, 2017
90591a6
don't remove tmp directory if it was not created
elipapa Jun 30, 2017
68ffb7f
volume and binds should both be there
elipapa Jul 1, 2017
8cda56d
use host_config's auto_Remove functionality
elipapa Jul 1, 2017
93ac259
docker API v>1.25 required for auto_remove
elipapa Jul 2, 2017
72a97c2
extended module docs with contribution use case
Sep 9, 2017
e997426
flake8 linting
Sep 11, 2017
c8d007f
added version to docker dependency
Sep 11, 2017
76757f4
mount tmp dir by default, and rename volumes to binds
Sep 25, 2017
6ad3094
enable docker service in travis ci to allow DockerTask tests to run
Sep 25, 2017
f877ca3
Merge remote-tracking branch 'remotes/upstream/master'
Sep 26, 2017
fe5636d
Merge branch 'master' into master
apierleoni Oct 14, 2017
4e1fb1f
fix boto problem in tests
apierleoni Oct 14, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ or held presentations about Luigi:
* `Groupon <https://www.groupon.com/>`_ / `OrderUp <https://orderup.com>`_ `(alternative implementation) <https://github.com/groupon/luigi-warehouse>`__
* `Red Hat - Marketing Operations <https://www.redhat.com>`_ `(blog, 2017) <https://github.com/rh-marketingops/rh-mo-scc-luigi>`__
* `GetNinjas <https://www.getninjas.com.br/>`_ `(blog, 2017) <https://labs.getninjas.com.br/using-luigi-to-create-and-monitor-pipelines-of-batch-jobs-eb8b3cd2a574>`__
* `Open Targets <https://www.opentargets.org/>`_ `(blog, 2017) <https://blog.opentargets.org/using-containers-with-luigi>`__

Some more companies are using Luigi but haven't had a chance yet to write about it:

Expand Down
245 changes: 245 additions & 0 deletions luigi/contrib/docker_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017 Open Targets
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


"""
Docker container wrapper for Luigi.

Enables running a docker container as a task in luigi.
This wrapper uses the Docker Python SDK to communicate directly with the
Docker API avoiding the common pattern to invoke the docker client
from the command line. Using the SDK it is possible to detect and properly
handle errors occurring when pulling, starting or running the containers.
On top of this, it is possible to mount a single file in the container
and a temporary directory is created on the host and mounted allowing
the handling of files bigger than the container limit.

Requires:

- docker: ``pip install docker``

Written and maintained by Andrea Pierleoni (@apierleoni).
Contributions by Eliseo Papa (@elipapa).
"""
from tempfile import mkdtemp
import logging
import luigi

from luigi.local_target import LocalFileSystem
from luigi import six

logger = logging.getLogger('luigi-interface')

try:
import docker
from docker.errors import ContainerError, ImageNotFound, APIError

except ImportError:
logger.warning('docker is not installed. DockerTask requires docker.')
docker = None

# TODO: may need to implement this logic for remote hosts
# class dockerconfig(luigi.Config):
# '''
# this class allows to use the luigi.cfg file to specify the path to the docker config.json.
# The docker client should look by default in the main directory,
# but on different systems this may need to be specified.
# '''
# docker_config_path = luigi.Parameter(
# default="~/.docker/config.json",
# description="Path to dockercfg file for authentication")


class DockerTask(luigi.Task):

@property
def image(self):
return 'alpine'

@property
def command(self):
return "echo hello world"

@property
def name(self):
return None

@property
def container_options(self):
return {}

@property
def environment(self):
return {}

@property
def container_tmp_dir(self):
return '/tmp/luigi'

@property
def binds(self):
'''
Override this to mount local volumes, in addition to the /tmp/luigi
which gets defined by default. This should return a list of strings.
e.g. ['/hostpath1:/containerpath1', '/hostpath2:/containerpath2']
'''
return None

@property
def network_mode(self):
return ''

@property
def docker_url(self):
return None

@property
def auto_remove(self):
return True

@property
def force_pull(self):
return False

@property
def mount_tmp(self):
return False

def __init__(self, *args, **kwargs):
'''
When a new instance of the DockerTask class gets created:
- call the parent class __init__ method
- start the logger
- init an instance of the docker client
- create a tmp dir
- add the temp dir to the volume binds specified in the task
'''
super(DockerTask, self).__init__(*args, **kwargs)
self.__logger = logger

'''init docker client
using the low level API as the higher level API does not allow to mount single
files as volumes
'''
self._client = docker.APIClient(self.docker_url)

# add latest tag if nothing else is specified by task
if ':' not in self.image:
self._image = ':'.join([self.image, 'latest'])
else:
self._image = self.image

if self.mount_tmp:
# create a tmp_dir, NOTE: /tmp needs to be specified for it to work on
# macOS, despite what the python documentation says
self._host_tmp_dir = mkdtemp(suffix=self.task_id,
prefix='luigi-docker-tmp-dir-',
dir='/tmp')

self._binds = ['{0}:{1}'.format(self._host_tmp_dir, self.container_tmp_dir)]
else:
self._binds = []

# update environment property with the (internal) location of tmp_dir
self.environment['LUIGI_TMP_DIR'] = self.container_tmp_dir

# add additional volume binds specified by the user to the tmp_Dir bind
if isinstance(self.binds, six.string_types):
self._binds.append(self.binds)
elif isinstance(self.binds, list):
self._binds.extend(self.binds)

# derive volumes (ie. list of container destination paths) from
# specified binds
self._volumes = [b.split(':')[1] for b in self._binds]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One addition I had to make to get this properly working in our scenario was to include a custom instance attribute self._complete = False and set this to True when the container exited.

def run(self):

# get image if missing
if self.force_pull or len(self._client.images(name=self._image)) == 0:
logger.info('Pulling docker image ' + self._image)
try:
for logline in self._client.pull(self._image, stream=True):
logger.debug(logline.decode('utf-8'))
except APIError as e:
self.__logger.warning("Error in Docker API: " + e.explanation)
raise

# remove clashing container if a container with the same name exists
if self.auto_remove and self.name:
try:
self._client.remove_container(self.name,
force=True)
except APIError as e:
self.__logger.warning("Ignored error in Docker API: " + e.explanation)

# run the container
try:
logger.debug('Creating image: %s command: %s volumes: %s'
% (self._image, self.command, self._binds))

host_config = self._client.create_host_config(binds=self._binds,
network_mode=self.network_mode)

container = self._client.create_container(self._image,
command=self.command,
name=self.name,
environment=self.environment,
volumes=self._volumes,
host_config=host_config,
**self.container_options)
self._client.start(container['Id'])

exit_status = self._client.wait(container['Id'])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+self._complete = True

I'm not sure anymore if I included this simply to True on purpose or if this should be set in subject to the exit_status.

In our use case we're chaining multiple DockerTask consecutive so that each task depend on the previous one and its output. As I'm quite new to working with luigi I don't have that much deep knowledge and experience so far, but I think I had some problems with the luigi scheduler while the scheduler was trying to resolve and start the DockerTask process chain properly. After including the previous changes and overwriting the complete methods, the chain worked fine for me. As far as I remember correctly, I've even got the tests to run fine after renaming volumes to binds. Perhaps this is use case specific, but maybe someone with better knowledge of luigi can overlook this better.

class ImportProcess(DockerTask):
    ....
    
    def complete(self):
        return self.output().exists() or self._complete
    ...

Of course this could also be implemented in the DockerTask class directly with maybe some modification like this:

class DockerTask(luigi.Task):
    ....
    def complete(self):
        return super(DockerTask, self).complete() or self._complete

if exit_status != 0:
stdout = False
stderr = True
error = self._client.logs(container['Id'],
stdout=stdout,
stderr=stderr)
if self.auto_remove:
try:
self._client.remove_container(container['Id'])
except docker.errors.APIError:
self.__logger.warning("Container " + container['Id'] +
" could not be removed")
if exit_status != 0:
raise ContainerError(container, exit_status, self.command, self._image, error)

except ContainerError as e:
# catch non zero exti status and return it
container_name = ''
if self.name:
container_name = self.name
try:
message = e.message
except:
message = str(e)
self.__logger.error("Container " + container_name +
" exited with non zero code: " + message)
raise
except ImageNotFound as e:
self.__logger.error("Image " + self._image + " not found")
raise
except APIError as e:
self.__logger.error("Error in Docker API: "+e.explanation)
raise

# delete temp dir
filesys = LocalFileSystem()
if self.mount_tmp and filesys.exists(self._host_tmp_dir):
filesys.remove(self._host_tmp_dir, recursive=True)
Loading