-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for the Docker Python SDK #2158
Changes from 45 commits
8a4d331
8481b8a
83e6b48
3b67754
3cd4d11
849e59c
55f0528
bb8122c
f1fb534
ac55d9e
9321e4a
07fcb84
09f254e
5258346
0f1547c
a4643a1
65b1a46
e0a4f28
a7da033
7273d3d
5750452
8c8580f
6ce5586
8452bf4
89ce5c0
a625c32
caedbcd
fa3127c
f7c4197
caf0070
9cf4e28
a1f0f29
d939dce
94f7890
0ae8df1
32b1a88
93608c2
ca21d2d
90591a6
68ffb7f
8cda56d
93ac259
72a97c2
e997426
c8d007f
76757f4
6ad3094
f877ca3
fe5636d
4e1fb1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,245 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright 2017 Open Targets | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
|
||
""" | ||
Docker container wrapper for Luigi. | ||
|
||
Enables running a docker container as a task in luigi. | ||
This wrapper uses the Docker Python SDK to communicate directly with the | ||
Docker API avoiding the common pattern to invoke the docker client | ||
from the command line. Using the SDK it is possible to detect and properly | ||
handle errors occurring when pulling, starting or running the containers. | ||
On top of this, it is possible to mount a single file in the container | ||
and a temporary directory is created on the host and mounted allowing | ||
the handling of files bigger than the container limit. | ||
|
||
Requires: | ||
|
||
- docker: ``pip install docker`` | ||
|
||
Written and maintained by Andrea Pierleoni (@apierleoni). | ||
Contributions by Eliseo Papa (@elipapa). | ||
""" | ||
from tempfile import mkdtemp | ||
import logging | ||
import luigi | ||
|
||
from luigi.local_target import LocalFileSystem | ||
from luigi import six | ||
|
||
logger = logging.getLogger('luigi-interface') | ||
|
||
try: | ||
import docker | ||
from docker.errors import ContainerError, ImageNotFound, APIError | ||
|
||
except ImportError: | ||
logger.warning('docker is not installed. DockerTask requires docker.') | ||
docker = None | ||
|
||
# TODO: may need to implement this logic for remote hosts | ||
# class dockerconfig(luigi.Config): | ||
# ''' | ||
# this class allows to use the luigi.cfg file to specify the path to the docker config.json. | ||
# The docker client should look by default in the main directory, | ||
# but on different systems this may need to be specified. | ||
# ''' | ||
# docker_config_path = luigi.Parameter( | ||
# default="~/.docker/config.json", | ||
# description="Path to dockercfg file for authentication") | ||
|
||
|
||
class DockerTask(luigi.Task): | ||
|
||
@property | ||
def image(self): | ||
return 'alpine' | ||
|
||
@property | ||
def command(self): | ||
return "echo hello world" | ||
|
||
@property | ||
def name(self): | ||
return None | ||
|
||
@property | ||
def container_options(self): | ||
return {} | ||
|
||
@property | ||
def environment(self): | ||
return {} | ||
|
||
@property | ||
def container_tmp_dir(self): | ||
return '/tmp/luigi' | ||
|
||
@property | ||
def binds(self): | ||
''' | ||
Override this to mount local volumes, in addition to the /tmp/luigi | ||
which gets defined by default. This should return a list of strings. | ||
e.g. ['/hostpath1:/containerpath1', '/hostpath2:/containerpath2'] | ||
''' | ||
return None | ||
|
||
@property | ||
def network_mode(self): | ||
return '' | ||
|
||
@property | ||
def docker_url(self): | ||
return None | ||
|
||
@property | ||
def auto_remove(self): | ||
return True | ||
|
||
@property | ||
def force_pull(self): | ||
return False | ||
|
||
@property | ||
def mount_tmp(self): | ||
return False | ||
|
||
def __init__(self, *args, **kwargs): | ||
''' | ||
When a new instance of the DockerTask class gets created: | ||
- call the parent class __init__ method | ||
- start the logger | ||
- init an instance of the docker client | ||
- create a tmp dir | ||
- add the temp dir to the volume binds specified in the task | ||
''' | ||
super(DockerTask, self).__init__(*args, **kwargs) | ||
self.__logger = logger | ||
|
||
'''init docker client | ||
using the low level API as the higher level API does not allow to mount single | ||
files as volumes | ||
''' | ||
self._client = docker.APIClient(self.docker_url) | ||
|
||
# add latest tag if nothing else is specified by task | ||
if ':' not in self.image: | ||
self._image = ':'.join([self.image, 'latest']) | ||
else: | ||
self._image = self.image | ||
|
||
if self.mount_tmp: | ||
# create a tmp_dir, NOTE: /tmp needs to be specified for it to work on | ||
# macOS, despite what the python documentation says | ||
self._host_tmp_dir = mkdtemp(suffix=self.task_id, | ||
prefix='luigi-docker-tmp-dir-', | ||
dir='/tmp') | ||
|
||
self._binds = ['{0}:{1}'.format(self._host_tmp_dir, self.container_tmp_dir)] | ||
else: | ||
self._binds = [] | ||
|
||
# update environment property with the (internal) location of tmp_dir | ||
self.environment['LUIGI_TMP_DIR'] = self.container_tmp_dir | ||
|
||
# add additional volume binds specified by the user to the tmp_Dir bind | ||
if isinstance(self.binds, six.string_types): | ||
self._binds.append(self.binds) | ||
elif isinstance(self.binds, list): | ||
self._binds.extend(self.binds) | ||
|
||
# derive volumes (ie. list of container destination paths) from | ||
# specified binds | ||
self._volumes = [b.split(':')[1] for b in self._binds] | ||
|
||
def run(self): | ||
|
||
# get image if missing | ||
if self.force_pull or len(self._client.images(name=self._image)) == 0: | ||
logger.info('Pulling docker image ' + self._image) | ||
try: | ||
for logline in self._client.pull(self._image, stream=True): | ||
logger.debug(logline.decode('utf-8')) | ||
except APIError as e: | ||
self.__logger.warning("Error in Docker API: " + e.explanation) | ||
raise | ||
|
||
# remove clashing container if a container with the same name exists | ||
if self.auto_remove and self.name: | ||
try: | ||
self._client.remove_container(self.name, | ||
force=True) | ||
except APIError as e: | ||
self.__logger.warning("Ignored error in Docker API: " + e.explanation) | ||
|
||
# run the container | ||
try: | ||
logger.debug('Creating image: %s command: %s volumes: %s' | ||
% (self._image, self.command, self._binds)) | ||
|
||
host_config = self._client.create_host_config(binds=self._binds, | ||
network_mode=self.network_mode) | ||
|
||
container = self._client.create_container(self._image, | ||
command=self.command, | ||
name=self.name, | ||
environment=self.environment, | ||
volumes=self._volumes, | ||
host_config=host_config, | ||
**self.container_options) | ||
self._client.start(container['Id']) | ||
|
||
exit_status = self._client.wait(container['Id']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. + I'm not sure anymore if I included this simply to In our use case we're chaining multiple class ImportProcess(DockerTask):
....
def complete(self):
return self.output().exists() or self._complete
... Of course this could also be implemented in the class DockerTask(luigi.Task):
....
def complete(self):
return super(DockerTask, self).complete() or self._complete |
||
if exit_status != 0: | ||
stdout = False | ||
stderr = True | ||
error = self._client.logs(container['Id'], | ||
stdout=stdout, | ||
stderr=stderr) | ||
if self.auto_remove: | ||
try: | ||
self._client.remove_container(container['Id']) | ||
except docker.errors.APIError: | ||
self.__logger.warning("Container " + container['Id'] + | ||
" could not be removed") | ||
if exit_status != 0: | ||
raise ContainerError(container, exit_status, self.command, self._image, error) | ||
|
||
except ContainerError as e: | ||
# catch non zero exti status and return it | ||
container_name = '' | ||
if self.name: | ||
container_name = self.name | ||
try: | ||
message = e.message | ||
except: | ||
message = str(e) | ||
self.__logger.error("Container " + container_name + | ||
" exited with non zero code: " + message) | ||
raise | ||
except ImageNotFound as e: | ||
self.__logger.error("Image " + self._image + " not found") | ||
raise | ||
except APIError as e: | ||
self.__logger.error("Error in Docker API: "+e.explanation) | ||
raise | ||
|
||
# delete temp dir | ||
filesys = LocalFileSystem() | ||
if self.mount_tmp and filesys.exists(self._host_tmp_dir): | ||
filesys.remove(self._host_tmp_dir, recursive=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One addition I had to make to get this properly working in our scenario was to include a custom instance attribute
self._complete = False
and set this toTrue
when the container exited.