Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds option to disable mounting temporary folder in DockerOperator #16932

Merged
merged 3 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 90 additions & 61 deletions airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Dict, Iterable, List, Optional, Union

from docker import APIClient, tls
from docker.errors import APIError
from docker.types import Mount

from airflow.exceptions import AirflowException
Expand All @@ -32,12 +33,23 @@ class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.

A temporary directory is created on the host and
mounted into a container to allow storing files
By default, a temporary directory is
created on the host and mounted into a container to allow storing files
that together exceed the default disk size of 10GB in a container.
The path to the mounted directory can be accessed
In this case The path to the mounted directory can be accessed
via the environment variable ``AIRFLOW_TMP_DIR``.

If the volume cannot be mounted, warning is printed and an attempt is made to execute the docker
command without the temporary folder mounted. This is to make it works by default with remote docker
engine or when you run docker-in-docker solution and temporary directory is not shared with the
docker engine. Warning is printed in logs in this case.

If you know you run DockerOperator with remote engine or via docker-in-docker
you should set ``mount_tmp_dir`` parameter to False. In this case, you can still use
``mounts`` parameter to mount already existing named volumes in your Docker Engine
to achieve similar capability where you can store files exceeding default disk size
of the container,

If a login to a private registry is required prior to pulling the image, a
Docker connection needs to be configured in Airflow and the connection ID
be provided with the parameter ``docker_conn_id``.
Expand Down Expand Up @@ -88,6 +100,9 @@ class DockerOperator(BaseOperator):
:type tls_hostname: str or bool
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:type tls_ssl_version: str
:param mount_tmp_dir: Specify whether the temporary directory should be bind-mounted
from the host to the container. Defaults to True
:type mount_tmp_dir: bool
:param tmp_dir: Mount point inside the container to
a temporary directory created on the host by the operator.
The path is also made available via the environment variable
Expand Down Expand Up @@ -154,6 +169,7 @@ def __init__(
tls_client_key: Optional[str] = None,
tls_hostname: Optional[Union[str, bool]] = None,
tls_ssl_version: Optional[str] = None,
mount_tmp_dir: bool = True,
tmp_dir: str = '/tmp/airflow',
user: Optional[Union[str, int]] = None,
mounts: Optional[List[Mount]] = None,
Expand Down Expand Up @@ -193,6 +209,7 @@ def __init__(
self.tls_client_key = tls_client_key
self.tls_hostname = tls_hostname
self.tls_ssl_version = tls_ssl_version
self.mount_tmp_dir = mount_tmp_dir
self.tmp_dir = tmp_dir
self.user = user
self.mounts = mounts or []
Expand Down Expand Up @@ -227,66 +244,80 @@ def get_hook(self) -> DockerHook:
def _run_image(self) -> Optional[str]:
"""Run a Docker container with the provided image"""
self.log.info('Starting docker container from image %s', self.image)
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
if self.mount_tmp_dir:
with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind")
try:
return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True)
except APIError as e:
if self.host_tmp_dir in str(e):
self.log.warning(
"Using remote engine or docker-in-docker and mounting temporary "
"volume from host is not supported. Falling back to "
"`mount_tmp_dir=False` mode. You can set `mount_tmp_dir` parameter"
" to False to disable mounting and remove the warning"
)
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
raise
else:
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)

with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind")
potiuk marked this conversation as resolved.
Show resolved Hide resolved
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
environment={**self.environment, **self._private_environment},
host_config=self.cli.create_host_config(
auto_remove=False,
mounts=self.mounts + [tmp_mount],
network_mode=self.network_mode,
shm_size=self.shm_size,
dns=self.dns,
dns_search=self.dns_search,
cpu_shares=int(round(self.cpus * 1024)),
mem_limit=self.mem_limit,
cap_add=self.cap_add,
extra_hosts=self.extra_hosts,
privileged=self.privileged,
),
image=self.image,
user=self.user,
entrypoint=self.format_command(self.entrypoint),
working_dir=self.working_dir,
tty=self.tty,
)

lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)

try:
self.cli.start(self.container['Id'])
def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optional[str]:
if add_tmp_variable:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
else:
self.environment.pop('AIRFLOW_TMP_DIR', None)
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
environment={**self.environment, **self._private_environment},
host_config=self.cli.create_host_config(
auto_remove=False,
mounts=target_mounts,
network_mode=self.network_mode,
shm_size=self.shm_size,
dns=self.dns,
dns_search=self.dns_search,
cpu_shares=int(round(self.cpus * 1024)),
mem_limit=self.mem_limit,
cap_add=self.cap_add,
extra_hosts=self.extra_hosts,
privileged=self.privileged,
),
image=self.image,
user=self.user,
entrypoint=self.format_command(self.entrypoint),
working_dir=self.working_dir,
tty=self.tty,
)
lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])

line = ''
res_lines = []
for line in lines:
line = line.strip()
if hasattr(line, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
line = line.decode('utf-8')
res_lines.append(line)
self.log.info(line)
line = ''
res_lines = []
for line in lines:
line = line.strip()
if hasattr(line, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
line = line.decode('utf-8')
res_lines.append(line)
self.log.info(line)

result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
res_lines = "\n".join(res_lines)
raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
res_lines = "\n".join(res_lines)
raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")

ret = None
if self.do_xcom_push:
ret = (
self.cli.logs(container=self.container['Id'])
if self.xcom_all
else line.encode('utf-8')
)
return ret
finally:
if self.auto_remove:
self.cli.remove_container(self.container['Id'])
ret = None
if self.do_xcom_push:
ret = self.cli.logs(container=self.container['Id']) if self.xcom_all else line.encode('utf-8')
return ret
finally:
if self.auto_remove:
self.cli.remove_container(self.container['Id'])

def execute(self, context) -> Optional[str]:
self.cli = self._get_cli()
Expand All @@ -312,8 +343,6 @@ def execute(self, context) -> Optional[str]:
if latest_status.get(output_id) != output_status:
self.log.info("%s: %s", output_id, output_status)
latest_status[output_id] = output_status

self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
return self._run_image()

def _get_cli(self) -> APIClient:
Expand Down
168 changes: 168 additions & 0 deletions tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import logging
import unittest
from unittest import mock
from unittest.mock import call

import pytest
from docker.errors import APIError

from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -119,6 +121,172 @@ def test_execute(self):
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_execute_no_temp_dir(self):
operator = DockerOperator(
api_version='1.19',
command='env',
environment={'UNIT': 'TEST'},
private_environment={'PRIVATE': 'MESSAGE'},
image='ubuntu:latest',
network_mode='bridge',
owner='unittest',
task_id='unittest',
mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
mount_tmp_dir=False,
entrypoint='["sh", "-c"]',
working_dir='/container/path',
shm_size=1000,
host_tmp_dir='/host/airflow',
container_name='test_container',
tty=True,
)
operator.execute(None)

self.client_class_mock.assert_called_once_with(
base_url='unix://var/run/docker.sock', tls=None, version='1.19'
)

self.client_mock.create_container.assert_called_once_with(
command='env',
name='test_container',
environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
)
self.tempdir_mock.assert_not_called()
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
self.client_mock.attach.assert_called_once_with(
container='some_id', stdout=True, stderr=True, stream=True
)
self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True)
self.client_mock.wait.assert_called_once_with('some_id')
assert (
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_execute_fallback_temp_dir(self):
self.client_mock.create_container.side_effect = [
APIError(message="wrong path: " + "/host/airflow"),
{'Id': 'some_id'},
]
operator = DockerOperator(
api_version='1.19',
command='env',
environment={'UNIT': 'TEST'},
private_environment={'PRIVATE': 'MESSAGE'},
image='ubuntu:latest',
network_mode='bridge',
owner='unittest',
task_id='unittest',
mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
mount_tmp_dir=True,
entrypoint='["sh", "-c"]',
working_dir='/container/path',
shm_size=1000,
host_tmp_dir='/host/airflow',
container_name='test_container',
tty=True,
)
with self.assertLogs(operator.log, level=logging.WARNING) as captured:
operator.execute(None)
assert (
"WARNING:airflow.task.operators:Using remote engine or docker-in-docker "
"and mounting temporary volume from host is not supported" in captured.output[0]
)
self.client_class_mock.assert_called_once_with(
base_url='unix://var/run/docker.sock', tls=None, version='1.19'
)
self.client_mock.create_container.assert_has_calls(
[
call(
command='env',
name='test_container',
environment={'AIRFLOW_TMP_DIR': '/tmp/airflow', 'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
),
call(
command='env',
name='test_container',
environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
),
]
)
self.client_mock.create_host_config.assert_has_calls(
[
call(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
Mount(source='/mkdtemp', target='/tmp/airflow', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
),
call(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
),
]
)
self.tempdir_mock.assert_called_once_with(dir='/host/airflow', prefix='airflowtmp')
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
self.client_mock.attach.assert_called_once_with(
container='some_id', stdout=True, stderr=True, stream=True
)
self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True)
self.client_mock.wait.assert_called_once_with('some_id')
assert (
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_private_environment_is_private(self):
operator = DockerOperator(
private_environment={'PRIVATE': 'MESSAGE'}, image='ubuntu:latest', task_id='unittest'
Expand Down