diff --git a/services/dask-sidecar/tests/unit/conftest.py b/services/dask-sidecar/tests/unit/conftest.py index a5f27e7850ac..043f5d065310 100644 --- a/services/dask-sidecar/tests/unit/conftest.py +++ b/services/dask-sidecar/tests/unit/conftest.py @@ -176,7 +176,7 @@ def creator() -> AnyUrl: open_file = fsspec.open(new_remote_file, mode="wt", **s3_storage_kwargs) with open_file as fp: fp.write( # type: ignore - f"This is the file contents of file #'{(len(list_of_created_files)+1):03}'" + f"This is the file contents of file #'{(len(list_of_created_files)+1):03}'\n" ) for s in faker.sentences(5): fp.write(f"{s}\n") # type: ignore diff --git a/services/dask-sidecar/tests/unit/test_tasks.py b/services/dask-sidecar/tests/unit/test_tasks.py index e34552b7aef4..6b8d56d83a34 100644 --- a/services/dask-sidecar/tests/unit/test_tasks.py +++ b/services/dask-sidecar/tests/unit/test_tasks.py @@ -17,6 +17,7 @@ from unittest import mock from uuid import uuid4 +import distributed import fsspec import pytest from dask_task_models_library.container_tasks.docker import DockerBasicAuth @@ -31,7 +32,6 @@ TaskOutputData, TaskOutputDataSchema, ) -from distributed import Client from faker import Faker from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID @@ -453,7 +453,7 @@ def test_run_computational_sidecar_real_fct( ) def test_run_multiple_computational_sidecar_dask( event_loop: asyncio.AbstractEventLoop, - dask_client: Client, + dask_client: distributed.Client, ubuntu_task: ServiceExampleParam, mocker: MockerFixture, s3_settings: S3Settings, @@ -494,15 +494,23 @@ def test_run_multiple_computational_sidecar_dask( assert output_data[k] == v +@pytest.fixture +def log_sub( + dask_client: distributed.Client, +) -> distributed.Sub: + return distributed.Sub(TaskLogEvent.topic_name(), client=dask_client) + + @pytest.mark.parametrize( "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True ) -def test_run_computational_sidecar_dask( - dask_client: Client, +async def test_run_computational_sidecar_dask( + dask_client: distributed.Client, ubuntu_task: ServiceExampleParam, mocker: MockerFixture, s3_settings: S3Settings, boot_mode: BootMode, + log_sub: distributed.Sub, ): mocker.patch( "simcore_service_dask_sidecar.computational_sidecar.core.get_integration_version", @@ -524,17 +532,15 @@ def test_run_computational_sidecar_dask( ) worker_name = next(iter(dask_client.scheduler_info()["workers"])) - + assert worker_name output_data = future.result() assert isinstance(output_data, TaskOutputData) # check that the task produces expected logs - worker_logs = [log for _, log in dask_client.get_worker_logs()[worker_name]] # type: ignore - worker_logs.reverse() + worker_logs = [TaskLogEvent.parse_raw(msg).log for msg in log_sub.buffer] + for log in ubuntu_task.expected_logs: - r = re.compile( - rf"\[{ubuntu_task.service_key}:{ubuntu_task.service_version} - [^\/]+\/[^\s]+ - [^\]]+\]: ({log})" - ) + r = re.compile(rf"^({log}).*") search_results = list(filter(r.search, worker_logs)) assert ( len(search_results) > 0