Skip to content

Commit

Permalink
Update DockerSwarmOperator auto_remove to align with DockerOperator (a…
Browse files Browse the repository at this point in the history
…pache#45745)

* Update DockerSwarmOperator auto_remove to align with DockerOperator

* add docker swarm auto remove test
  • Loading branch information
niklasr22 authored and ambika-garg committed Feb 13, 2025
1 parent 0501cac commit f4d446a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ class DockerSwarmOperator(DockerOperator):
If image tag is omitted, "latest" will be used.
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
:param auto_remove: Auto-removal of the container on daemon side when the
container's process exits.
The default is False.
:param auto_remove: Enable removal of the service when the service has terminated. Possible values:
- ``never``: (default) do not remove service
- ``success``: remove on success
- ``force``: always remove service
:param command: Command to be run in the container. (templated)
:param args: Arguments to the command.
:param docker_url: URL of the host running the docker daemon.
Expand Down Expand Up @@ -214,18 +216,16 @@ def _run_service(self) -> None:
container_id = task["Status"]["ContainerStatus"]["ContainerID"]
container = self.cli.inspect_container(container_id)
self.containers.append(container)
else:
raise AirflowException(f"Service did not complete: {self.service!r}")

if self.retrieve_output:
return self._attempt_to_retrieve_results()

self.log.info("auto_removeauto_removeauto_removeauto_removeauto_remove : %s", str(self.auto_remove))
self.log.info("auto_remove: %s", str(self.auto_remove))
if self.service and self._service_status() != "complete":
if self.auto_remove == "success":
if self.auto_remove == "force":
self.cli.remove_service(self.service["ID"])
raise AirflowException(f"Service did not complete: {self.service!r}")
elif self.auto_remove == "success":
elif self.auto_remove in ["success", "force"]:
if not self.service:
raise RuntimeError("The 'service' should be initialized before!")
self.cli.remove_service(self.service["ID"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ def _client_service_logs_effect():
client_mock.remove_service.assert_called_once_with("some_id")

@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
def test_auto_remove(self, types_mock, docker_api_client_patcher):
@pytest.mark.parametrize("auto_remove", ["success", "force"])
def test_auto_remove(self, types_mock, docker_api_client_patcher, auto_remove):
mock_obj = mock.Mock()

client_mock = mock.Mock(spec=APIClient)
Expand All @@ -148,12 +149,45 @@ def test_auto_remove(self, types_mock, docker_api_client_patcher):
docker_api_client_patcher.return_value = client_mock

operator = DockerSwarmOperator(
image="", auto_remove="success", task_id="unittest", enable_logging=False
image="", auto_remove=auto_remove, task_id="unittest", enable_logging=False
)
operator.execute(None)

client_mock.remove_service.assert_called_once_with("some_id")

@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
@pytest.mark.parametrize(
"auto_remove,expected_remove_call", [("success", False), ("force", True), ("never", False)]
)
def test_auto_remove_failed(
self, types_mock, docker_api_client_patcher, auto_remove, expected_remove_call
):
mock_obj = mock.Mock()

client_mock = mock.Mock(spec=APIClient)
client_mock.create_service.return_value = {"ID": "some_id"}
client_mock.images.return_value = []
client_mock.pull.return_value = [b'{"status":"pull log"}']
client_mock.tasks.return_value = [
{"Status": {"State": "failed", "ContainerStatus": {"ContainerID": "some_id"}}}
]
types_mock.TaskTemplate.return_value = mock_obj
types_mock.ContainerSpec.return_value = mock_obj
types_mock.RestartPolicy.return_value = mock_obj
types_mock.Resources.return_value = mock_obj

docker_api_client_patcher.return_value = client_mock

operator = DockerSwarmOperator(
image="", auto_remove=auto_remove, task_id="unittest", enable_logging=False
)
try:
operator.execute(None)
except AirflowException:
pass

assert (client_mock.remove_service.call_count > 0) == expected_remove_call

@mock.patch("airflow.providers.docker.operators.docker_swarm.types")
def test_no_auto_remove(self, types_mock, docker_api_client_patcher):
mock_obj = mock.Mock()
Expand Down

0 comments on commit f4d446a

Please sign in to comment.