diff --git a/.ruff.toml b/.ruff.toml index 4543860ca6a..c5ac238ec6f 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -51,7 +51,7 @@ target-version = "py310" [per-file-ignores] -"**/tests/**" = [ +"{**/{tests, pytest_simcore}/**}" = [ "T201", # print found "ARG001", # unused function argument "PT019", # user pytest.mark.usefixture diff --git a/.vscode/launch.template.json b/.vscode/launch.template.json index 4abbb153965..8b49e16b057 100644 --- a/.vscode/launch.template.json +++ b/.vscode/launch.template.json @@ -46,6 +46,19 @@ "name": "Python: Remote Attach api-server", "type": "python", "request": "attach", + "port": 3015, + "host": "127.0.0.1", + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "/devel" + } + ] + }, + { + "name": "Python: Remote Attach clusters-keeper", + "type": "python", + "request": "attach", "port": 3006, "host": "127.0.0.1", "pathMappings": [ diff --git a/packages/pytest-simcore/src/pytest_simcore/dask_gateway.py b/packages/pytest-simcore/src/pytest_simcore/dask_gateway.py new file mode 100644 index 00000000000..e54db09d454 --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/dask_gateway.py @@ -0,0 +1,104 @@ +# pylint: disable=unused-argument +# pylint: disable=redefined-outer-name + +from collections.abc import Callable +from typing import AsyncIterator, NamedTuple + +import pytest +import traitlets.config +from dask_gateway import Gateway, GatewayCluster, auth +from dask_gateway_server.app import DaskGateway +from dask_gateway_server.backends.local import UnsafeLocalBackend +from distributed import Client + + +@pytest.fixture +def local_dask_gateway_server_config( + unused_tcp_port_factory: Callable, +) -> traitlets.config.Config: + c = traitlets.config.Config() + assert isinstance(c.DaskGateway, traitlets.config.Config) + assert isinstance(c.ClusterConfig, traitlets.config.Config) + assert isinstance(c.Proxy, traitlets.config.Config) + assert isinstance(c.SimpleAuthenticator, traitlets.config.Config) + c.DaskGateway.backend_class = UnsafeLocalBackend + c.DaskGateway.address = f"127.0.0.1:{unused_tcp_port_factory()}" + c.Proxy.address = f"127.0.0.1:{unused_tcp_port_factory()}" + c.DaskGateway.authenticator_class = "dask_gateway_server.auth.SimpleAuthenticator" + c.SimpleAuthenticator.password = "qweqwe" # noqa: S105 + c.ClusterConfig.worker_cmd = [ + "dask-worker", + "--resources", + f"CPU=12,GPU=1,RAM={16e9}", + ] + # NOTE: This must be set such that the local unsafe backend creates a worker with enough cores/memory + c.ClusterConfig.worker_cores = 12 + c.ClusterConfig.worker_memory = "16G" + c.ClusterConfig.cluster_max_workers = 3 + + c.DaskGateway.log_level = "DEBUG" + return c + + +class DaskGatewayServer(NamedTuple): + address: str + proxy_address: str + password: str + server: DaskGateway + + +@pytest.fixture +async def local_dask_gateway_server( + local_dask_gateway_server_config: traitlets.config.Config, +) -> AsyncIterator[DaskGatewayServer]: + print("--> creating local dask gateway server") + dask_gateway_server = DaskGateway(config=local_dask_gateway_server_config) + dask_gateway_server.initialize([]) # that is a shitty one! + print("--> local dask gateway server initialized") + await dask_gateway_server.setup() + await dask_gateway_server.backend.proxy._proxy_contacted # pylint: disable=protected-access + + print("--> local dask gateway server setup completed") + yield DaskGatewayServer( + f"http://{dask_gateway_server.backend.proxy.address}", + f"gateway://{dask_gateway_server.backend.proxy.tcp_address}", + local_dask_gateway_server_config.SimpleAuthenticator.password, # type: ignore + dask_gateway_server, + ) + print("--> local dask gateway server switching off...") + await dask_gateway_server.cleanup() + print("...done") + + +@pytest.fixture +async def dask_gateway( + local_dask_gateway_server: DaskGatewayServer, +) -> Gateway: + async with Gateway( + local_dask_gateway_server.address, + local_dask_gateway_server.proxy_address, + asynchronous=True, + auth=auth.BasicAuth("pytest_user", local_dask_gateway_server.password), + ) as gateway: + print(f"--> {gateway=} created") + cluster_options = await gateway.cluster_options() + gateway_versions = await gateway.get_versions() + clusters_list = await gateway.list_clusters() + print(f"--> {gateway_versions=}, {cluster_options=}, {clusters_list=}") + for option in cluster_options.items(): + print(f"--> {option=}") + return gateway + + +@pytest.fixture +async def dask_gateway_cluster(dask_gateway: Gateway) -> AsyncIterator[GatewayCluster]: + async with dask_gateway.new_cluster() as cluster: + yield cluster + + +@pytest.fixture +async def dask_gateway_cluster_client( + dask_gateway_cluster: GatewayCluster, +) -> AsyncIterator[Client]: + async with dask_gateway_cluster.get_client() as client: + yield client diff --git a/services/clusters-keeper/.env-devel b/services/clusters-keeper/.env-devel new file mode 100644 index 00000000000..feebc7e1483 --- /dev/null +++ b/services/clusters-keeper/.env-devel @@ -0,0 +1,21 @@ +CLUSTERS_KEEPER_DEBUG=true +CLUSTERS_KEEPER_LOGLEVEL=INFO +CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=60 +CLUSTERS_KEEPER_TASK_INTERVAL=30 +EC2_ACCESS_KEY_ID=XXXXXXXXXX +EC2_INSTANCES_ALLOWED_TYPES="[\"t2.micro\"]" +EC2_INSTANCES_AMI_ID=XXXXXXXXXX +EC2_INSTANCES_KEY_NAME=XXXXXXXXXX +EC2_INSTANCES_SECURITY_GROUP_IDS=XXXXXXXXXX +EC2_INSTANCES_SUBNET_ID=XXXXXXXXXX +EC2_SECRET_ACCESS_KEY=XXXXXXXXXX +LOG_FORMAT_LOCAL_DEV_ENABLED=True +RABBIT_HOST=rabbit +RABBIT_PASSWORD=test +RABBIT_PORT=5672 +RABBIT_SECURE=false +RABBIT_USER=test +REDIS_HOST=redis +REDIS_PORT=6379 +SC_BOOT_MODE=debug-ptvsd +SC_BUILD_TARGET=development diff --git a/services/clusters-keeper/Makefile b/services/clusters-keeper/Makefile index 1f7f312696c..bfd707eb72e 100644 --- a/services/clusters-keeper/Makefile +++ b/services/clusters-keeper/Makefile @@ -3,3 +3,17 @@ # include ../../scripts/common.Makefile include ../../scripts/common-service.Makefile + +.env: .env-devel ## creates .env file from defaults in .env-devel + $(if $(wildcard $@), \ + @echo "WARNING ##### $< is newer than $@ ####"; diff -uN $@ $<; false;,\ + @echo "WARNING ##### $@ does not exist, cloning $< as $@ ############"; cp $< $@) + + +.PHONY: test-local +up-devel: .env ## starts local test application (running bare metal against AWS) + # setting up dependencies + @docker compose up + +down: .env ## stops local test app dependencies (running bare metal against AWS) + -@docker compose down diff --git a/services/clusters-keeper/docker-compose.yml b/services/clusters-keeper/docker-compose.yml new file mode 100644 index 00000000000..7ab1543d273 --- /dev/null +++ b/services/clusters-keeper/docker-compose.yml @@ -0,0 +1,51 @@ +version: "3.8" +services: + rabbit: + image: itisfoundation/rabbitmq:3.11.2-management + init: true + ports: + - "5672:5672" + - "15672:15672" + - "15692" + environment: + - RABBITMQ_DEFAULT_USER=${RABBIT_USER} + - RABBITMQ_DEFAULT_PASS=${RABBIT_PASSWORD} + healthcheck: + # see https://www.rabbitmq.com/monitoring.html#individual-checks for info about health-checks available in rabbitmq + test: rabbitmq-diagnostics -q status + interval: 5s + timeout: 30s + retries: 5 + start_period: 5s + + redis: + image: "redis:6.2.6@sha256:4bed291aa5efb9f0d77b76ff7d4ab71eee410962965d052552db1fb80576431d" + init: true + ports: + - "6379:6379" + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 5s + timeout: 30s + retries: 50 + + redis-commander: + image: rediscommander/redis-commander:latest + init: true + ports: + - "18081:8081" + environment: + - REDIS_HOSTS=resources:${REDIS_HOST}:${REDIS_PORT}:0,locks:${REDIS_HOST}:${REDIS_PORT}:1,validation_codes:${REDIS_HOST}:${REDIS_PORT}:2,scheduled_maintenance:${REDIS_HOST}:${REDIS_PORT}:3,user_notifications:${REDIS_HOST}:${REDIS_PORT}:4,announcements:${REDIS_HOST}:${REDIS_PORT}:5 + # If you add/remove a db, do not forget to update the --databases entry in the docker-compose.yml + + clusters-keeper: + image: local/clusters-keeper:development + init: true + ports: + - "8010:8000" + - "3015:3000" + env_file: + - .env + volumes: + - ./:/devel/services/clusters-keeper + - ../../packages:/devel/packages diff --git a/services/clusters-keeper/docker/entrypoint.sh b/services/clusters-keeper/docker/entrypoint.sh index 28ec2326d4e..761dce3b75b 100755 --- a/services/clusters-keeper/docker/entrypoint.sh +++ b/services/clusters-keeper/docker/entrypoint.sh @@ -65,7 +65,7 @@ fi if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then # NOTE: production does NOT pre-installs ptvsd - pip install --no-cache-dir ptvsd + pip install --no-cache-dir debugpy fi # Appends docker group if socket is mounted diff --git a/services/clusters-keeper/requirements/_base.in b/services/clusters-keeper/requirements/_base.in index 7578075b3a5..1a3c2005018 100644 --- a/services/clusters-keeper/requirements/_base.in +++ b/services/clusters-keeper/requirements/_base.in @@ -4,6 +4,7 @@ # NOTE: ALL version constraints MUST be commented --constraint ../../../requirements/constraints.txt --constraint ./constraints.txt +--constraint ../../../services/dask-sidecar/requirements/_dask-distributed.txt # intra-repo required dependencies --requirement ../../../packages/models-library/requirements/_base.in @@ -13,7 +14,10 @@ --requirement ../../../packages/service-library/requirements/_fastapi.in + aioboto3 +dask[distributed] +dask-gateway fastapi packaging types-aiobotocore[ec2] diff --git a/services/clusters-keeper/requirements/_base.txt b/services/clusters-keeper/requirements/_base.txt index 50e3da9b5ab..0f9ca5b9268 100644 --- a/services/clusters-keeper/requirements/_base.txt +++ b/services/clusters-keeper/requirements/_base.txt @@ -4,22 +4,13 @@ # # pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in # -aio-pika==9.1.2 +aio-pika==9.2.2 # via - # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/./../../../packages/settings-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt # -c requirements/../../../packages/service-library/requirements/./_base.in - # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt - # -c requirements/../../../requirements/constraints.txt # -r requirements/../../../packages/service-library/requirements/_base.in -aioboto3==11.2.0 +aioboto3==11.3.0 # via -r requirements/_base.in -aiobotocore==2.5.0 +aiobotocore==2.6.0 # via aioboto3 aiodebug==2.3.0 # via @@ -29,7 +20,7 @@ aiodocker==0.21.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -aiofiles==23.1.0 +aiofiles==23.2.1 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in @@ -46,6 +37,7 @@ aiohttp==3.8.5 # -c requirements/../../../requirements/constraints.txt # aiobotocore # aiodocker + # dask-gateway aioitertools==0.11.0 # via aiobotocore aiormq==6.7.7 @@ -63,7 +55,7 @@ arrow==1.2.3 # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/_base.in -async-timeout==4.0.2 +async-timeout==4.0.3 # via # aiohttp # redis @@ -72,14 +64,14 @@ attrs==23.1.0 # aiohttp # jsonschema # referencing -boto3==1.26.76 +boto3==1.28.17 # via aiobotocore -botocore==1.29.76 +botocore==1.31.17 # via # aiobotocore # boto3 # s3transfer -botocore-stubs==1.31.22 +botocore-stubs==1.31.36 # via types-aiobotocore certifi==2023.7.22 # via @@ -87,15 +79,37 @@ certifi==2023.7.22 # httpx charset-normalizer==3.2.0 # via aiohttp -click==8.1.6 +click==8.1.7 # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask + # dask-gateway + # distributed # typer # uvicorn -dnspython==2.4.1 +cloudpickle==2.2.1 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask + # distributed +dask==2023.3.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # -r requirements/_base.in + # dask-gateway + # distributed +dask-gateway==2023.1.1 + # via -r requirements/_base.in +distributed==2023.3.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask + # dask-gateway +dnspython==2.4.2 # via email-validator email-validator==2.0.0.post2 # via pydantic -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via anyio fastapi==0.99.1 # via @@ -114,6 +128,10 @@ frozenlist==1.4.0 # via # aiohttp # aiosignal +fsspec==2023.6.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask h11==0.14.0 # via # httpcore @@ -138,6 +156,23 @@ idna==3.4 # email-validator # httpx # yarl +importlib-metadata==6.8.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask +jinja2==3.1.2 + # via + # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt + # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt + # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed jmespath==1.0.1 # via # boto3 @@ -149,18 +184,48 @@ jsonschema==4.19.0 # -r requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/_base.in jsonschema-specifications==2023.7.1 # via jsonschema +locket==1.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed + # partd markdown-it-py==3.0.0 # via rich +markupsafe==2.1.3 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # jinja2 mdurl==0.1.2 # via markdown-it-py +msgpack==1.0.5 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed multidict==6.0.4 # via # aiohttp # yarl +orjson==3.9.5 + # via + # -r requirements/../../../packages/models-library/requirements/_base.in + # -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in + # -r requirements/../../../packages/service-library/requirements/./../../../packages/models-library/requirements/_base.in packaging==23.1 - # via -r requirements/_base.in + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # -r requirements/_base.in + # dask + # distributed pamqp==3.2.1 # via aiormq +partd==1.4.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask +psutil==5.9.5 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed pydantic==1.10.12 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -203,8 +268,12 @@ pyyaml==6.0.1 # -c requirements/../../../packages/service-library/requirements/./_base.in # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # -r requirements/../../../packages/service-library/requirements/_base.in -redis==4.6.0 + # dask + # dask-gateway + # distributed +redis==5.0.0 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -228,11 +297,11 @@ rich==13.5.2 # -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/./../../../packages/settings-library/requirements/_base.in # -r requirements/../../../packages/settings-library/requirements/_base.in -rpds-py==0.9.2 +rpds-py==0.10.0 # via # jsonschema # referencing -s3transfer==0.6.1 +s3transfer==0.6.2 # via boto3 six==1.16.0 # via python-dateutil @@ -241,6 +310,10 @@ sniffio==1.3.0 # anyio # httpcore # httpx +sortedcontainers==2.4.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed starlette==0.27.0 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt @@ -253,15 +326,28 @@ starlette==0.27.0 # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt # fastapi -tenacity==8.2.2 +tblib==2.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed +tenacity==8.2.3 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in toolz==0.12.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # -r requirements/../../../packages/service-library/requirements/_base.in -tqdm==4.65.2 + # dask + # distributed + # partd +tornado==6.3.3 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # dask-gateway + # distributed +tqdm==4.66.1 # via # -c requirements/../../../packages/service-library/requirements/./_base.in # -r requirements/../../../packages/service-library/requirements/_base.in @@ -270,11 +356,11 @@ typer==0.9.0 # -r requirements/../../../packages/service-library/requirements/../../../packages/settings-library/requirements/_base.in # -r requirements/../../../packages/service-library/requirements/./../../../packages/settings-library/requirements/_base.in # -r requirements/../../../packages/settings-library/requirements/_base.in -types-aiobotocore==2.5.4 +types-aiobotocore==2.6.0 # via -r requirements/_base.in -types-aiobotocore-ec2==2.5.4 +types-aiobotocore-ec2==2.6.0 # via types-aiobotocore -types-awscrt==0.17.0 +types-awscrt==0.19.1 # via botocore-stubs typing-extensions==4.7.1 # via @@ -295,7 +381,9 @@ urllib3==1.26.16 # -c requirements/../../../packages/service-library/requirements/./../../../requirements/constraints.txt # -c requirements/../../../packages/settings-library/requirements/../../../requirements/constraints.txt # -c requirements/../../../requirements/constraints.txt + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # botocore + # distributed uvicorn==0.23.2 # via -r requirements/../../../packages/service-library/requirements/_fastapi.in wrapt==1.15.0 @@ -305,3 +393,11 @@ yarl==1.9.2 # aio-pika # aiohttp # aiormq +zict==3.0.0 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # distributed +zipp==3.16.2 + # via + # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt + # importlib-metadata diff --git a/services/clusters-keeper/requirements/_test.in b/services/clusters-keeper/requirements/_test.in index 1a52aedfb56..bc7d5261b17 100644 --- a/services/clusters-keeper/requirements/_test.in +++ b/services/clusters-keeper/requirements/_test.in @@ -14,12 +14,15 @@ aiodocker asgi-lifespan coverage +dask-gateway-server[local] +debugpy deepdiff docker faker fakeredis[lua] httpx moto[server] +parse psutil pytest pytest-asyncio diff --git a/services/clusters-keeper/requirements/_test.txt b/services/clusters-keeper/requirements/_test.txt index 6bf9e9848e0..a94e4abeb81 100644 --- a/services/clusters-keeper/requirements/_test.txt +++ b/services/clusters-keeper/requirements/_test.txt @@ -13,6 +13,7 @@ aiohttp==3.8.5 # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt # aiodocker + # dask-gateway-server aiosignal==1.3.1 # via # -c requirements/_base.txt @@ -23,7 +24,7 @@ anyio==3.7.1 # httpcore asgi-lifespan==2.1.0 # via -r requirements/_test.in -async-timeout==4.0.2 +async-timeout==4.0.3 # via # -c requirements/_base.txt # aiohttp @@ -42,12 +43,12 @@ aws-xray-sdk==2.12.0 # via moto blinker==1.6.2 # via flask -boto3==1.26.76 +boto3==1.28.17 # via # -c requirements/_base.txt # aws-sam-translator # moto -botocore==1.29.76 +botocore==1.31.17 # via # -c requirements/_base.txt # aws-xray-sdk @@ -69,10 +70,12 @@ charset-normalizer==3.2.0 # -c requirements/_base.txt # aiohttp # requests -click==8.1.6 +click==8.1.7 # via # -c requirements/_base.txt # flask +colorlog==6.7.0 + # via dask-gateway-server coverage==7.3.0 # via # -r requirements/_test.in @@ -80,9 +83,14 @@ coverage==7.3.0 cryptography==41.0.3 # via # -c requirements/../../../requirements/constraints.txt + # dask-gateway-server # moto # python-jose # sshpubkeys +dask-gateway-server==2023.1.1 + # via -r requirements/_test.in +debugpy==1.6.7.post1 + # via -r requirements/_test.in deepdiff==6.3.1 # via -r requirements/_test.in docker==6.1.3 @@ -94,16 +102,16 @@ ecdsa==0.18.0 # moto # python-jose # sshpubkeys -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -c requirements/_base.txt # anyio # pytest -faker==19.3.0 +faker==19.3.1 # via -r requirements/_test.in fakeredis==2.18.0 # via -r requirements/_test.in -flask==2.3.2 +flask==2.3.3 # via # flask-cors # moto @@ -116,6 +124,8 @@ frozenlist==1.4.0 # aiosignal graphql-core==3.2.3 # via moto +greenlet==2.0.2 + # via sqlalchemy h11==0.14.0 # via # -c requirements/_base.txt @@ -144,6 +154,7 @@ itsdangerous==2.1.2 jinja2==3.1.2 # via # -c requirements/../../../requirements/constraints.txt + # -c requirements/_base.txt # flask # moto jmespath==1.0.1 @@ -168,7 +179,7 @@ jsonschema==4.19.0 # cfn-lint # openapi-schema-validator # openapi-spec-validator -jsonschema-spec==0.2.3 +jsonschema-spec==0.2.4 # via openapi-spec-validator jsonschema-specifications==2023.7.1 # via @@ -183,9 +194,10 @@ lupa==2.0 # via fakeredis markupsafe==2.1.3 # via + # -c requirements/_base.txt # jinja2 # werkzeug -moto==4.1.14 +moto==4.2.0 # via -r requirements/_test.in mpmath==1.3.0 # via sympy @@ -207,16 +219,20 @@ packaging==23.1 # -c requirements/_base.txt # docker # pytest +parse==1.19.1 + # via -r requirements/_test.in pathable==0.4.3 # via jsonschema-spec pbr==5.11.1 # via # jschema-to-python # sarif-om -pluggy==1.2.0 +pluggy==1.3.0 # via pytest psutil==5.9.5 - # via -r requirements/_test.in + # via + # -c requirements/_base.txt + # -r requirements/_test.in py-partiql-parser==0.3.6 # via moto pyasn1==0.5.0 @@ -264,7 +280,7 @@ pyyaml==6.0.1 # jsonschema-spec # moto # responses -redis==4.6.0 +redis==5.0.0 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt @@ -289,7 +305,7 @@ respx==0.20.2 # via -r requirements/_test.in rfc3339-validator==0.1.4 # via openapi-schema-validator -rpds-py==0.9.2 +rpds-py==0.10.0 # via # -c requirements/_base.txt # jsonschema @@ -298,7 +314,7 @@ rsa==4.9 # via # -c requirements/../../../requirements/constraints.txt # python-jose -s3transfer==0.6.1 +s3transfer==0.6.2 # via # -c requirements/_base.txt # boto3 @@ -319,7 +335,13 @@ sniffio==1.3.0 # httpcore # httpx sortedcontainers==2.4.0 - # via fakeredis + # via + # -c requirements/_base.txt + # fakeredis +sqlalchemy==1.4.49 + # via + # -c requirements/../../../requirements/constraints.txt + # dask-gateway-server sshpubkeys==3.3.1 # via moto sympy==1.12 @@ -328,6 +350,8 @@ tomli==2.0.1 # via # coverage # pytest +traitlets==5.9.0 + # via dask-gateway-server types-pyyaml==6.0.12.11 # via responses typing-extensions==4.7.1 @@ -344,7 +368,7 @@ urllib3==1.26.16 # docker # requests # responses -websocket-client==1.6.1 +websocket-client==1.6.2 # via docker werkzeug==2.3.7 # via diff --git a/services/clusters-keeper/requirements/_tools.txt b/services/clusters-keeper/requirements/_tools.txt index 16c3643865e..9533f735341 100644 --- a/services/clusters-keeper/requirements/_tools.txt +++ b/services/clusters-keeper/requirements/_tools.txt @@ -14,7 +14,7 @@ bump2version==1.0.1 # via -r requirements/../../../requirements/devenv.txt cfgv==3.4.0 # via pre-commit -click==8.1.6 +click==8.1.7 # via # -c requirements/_base.txt # -c requirements/_test.txt @@ -24,9 +24,9 @@ dill==0.3.7 # via pylint distlib==0.3.7 # via virtualenv -filelock==3.12.2 +filelock==3.12.3 # via virtualenv -identify==2.5.26 +identify==2.5.27 # via pre-commit isort==5.12.0 # via @@ -70,7 +70,7 @@ pyyaml==6.0.1 # -c requirements/_test.txt # pre-commit # watchdog -ruff==0.0.284 +ruff==0.0.286 # via -r requirements/../../../requirements/devenv.txt tomli==2.0.1 # via @@ -87,11 +87,12 @@ typing-extensions==4.7.1 # -c requirements/_base.txt # -c requirements/_test.txt # astroid + # filelock virtualenv==20.24.3 # via pre-commit watchdog==3.0.0 # via -r requirements/_tools.in -wheel==0.41.1 +wheel==0.41.2 # via pip-tools wrapt==1.15.0 # via diff --git a/services/clusters-keeper/setup.cfg b/services/clusters-keeper/setup.cfg index c62d029726e..d9529927575 100644 --- a/services/clusters-keeper/setup.cfg +++ b/services/clusters-keeper/setup.cfg @@ -9,3 +9,5 @@ commit_args = --no-verify [tool:pytest] asyncio_mode = auto +markers = + testit: "marks test to run during development" diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/_meta.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/_meta.py index 6f8cfa1b8c0..dc34db59f90 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/_meta.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/_meta.py @@ -10,7 +10,7 @@ from pydantic import parse_obj_as _current_distribution = pkg_resources.get_distribution( - "simcore-service-clusters_keeper" + "simcore-service-clusters-keeper" ) __version__: str = _current_distribution.version @@ -20,6 +20,7 @@ API_VERSION: Final[str] = __version__ VERSION: Final[Version] = Version(__version__) API_VTAG: Final[VersionTag] = parse_obj_as(VersionTag, f"v{VERSION.major}") +RPC_VTAG: Final[VersionTag] = parse_obj_as(VersionTag, f"v{VERSION.major}") def get_summary() -> str: @@ -30,7 +31,7 @@ def get_summary() -> str: metadata = _current_distribution.get_metadata_lines("PKG-INFO") return next(x.split(":") for x in metadata if x.startswith("Summary:"))[-1] - return "" + return "" # pragma: no cover SUMMARY: Final[str] = get_summary() diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py index 0e02140efd1..ad2882da3c8 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py @@ -11,7 +11,7 @@ from fastapi.responses import PlainTextResponse from pydantic import BaseModel -from ..modules.rabbitmq import get_rabbitmq_client +from ..modules.rabbitmq import get_rabbitmq_client, is_rabbitmq_enabled from ..modules.redis import get_redis_client from .dependencies.application import get_app @@ -39,9 +39,9 @@ class _StatusGet(BaseModel): async def get_status(app: Annotated[FastAPI, Depends(get_app)]) -> _StatusGet: return _StatusGet( rabbitmq=_ComponentStatus( - is_enabled=bool(app.state.rabbitmq_client), + is_enabled=is_rabbitmq_enabled(app), is_responsive=await get_rabbitmq_client(app).ping() - if app.state.rabbitmq_client + if is_rabbitmq_enabled(app) else False, ), ec2=_ComponentStatus( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py index 565fc67f8cf..cb8490cdb6f 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py @@ -1,6 +1,7 @@ import logging from fastapi import FastAPI +from models_library.basic_types import BootModeEnum from .._meta import ( API_VERSION, @@ -10,9 +11,12 @@ APP_STARTED_BANNER_MSG, ) from ..api.routes import setup_api_routes +from ..modules.clusters_management_task import setup as setup_clusters_management from ..modules.ec2 import setup as setup_ec2 from ..modules.rabbitmq import setup as setup_rabbitmq from ..modules.redis import setup as setup_redis +from ..modules.remote_debug import setup_remote_debugging +from ..rpc.rpc_routes import setup_rpc_routes from .settings import ApplicationSettings logger = logging.getLogger(__name__) @@ -35,10 +39,14 @@ def create_app(settings: ApplicationSettings) -> FastAPI: assert app.state.settings.API_VERSION == API_VERSION # nosec # PLUGINS SETUP + if settings.SC_BOOT_MODE == BootModeEnum.DEBUG: + setup_remote_debugging(app) setup_api_routes(app) setup_rabbitmq(app) + setup_rpc_routes(app) setup_ec2(app) setup_redis(app) + setup_clusters_management(app) # ERROR HANDLERS diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py index da629420cbb..ece008e3f21 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/errors.py @@ -2,7 +2,7 @@ class ClustersKeeperRuntimeError(PydanticErrorMixin, RuntimeError): - msg_template: str = "clusters_keeper unexpected error" + msg_template: str = "clusters-keeper unexpected error" class ConfigurationError(ClustersKeeperRuntimeError): diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 1cb9e025b22..877a171bde8 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -9,7 +9,14 @@ LogLevel, VersionTag, ) -from pydantic import Field, NonNegativeInt, PositiveInt, parse_obj_as, validator +from pydantic import ( + Field, + NonNegativeInt, + PositiveInt, + SecretStr, + parse_obj_as, + validator, +) from settings_library.base import BaseCustomSettings from settings_library.docker_registry import RegistrySettings from settings_library.rabbit import RabbitSettings @@ -67,17 +74,6 @@ class EC2InstancesSettings(BaseCustomSettings): "this is required to start a new EC2 instance", ) - EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field( - default=datetime.timedelta(minutes=1), - description="Time after which an EC2 instance may be terminated (repeat every hour, min 0, max 59 minutes)", - ) - - EC2_INSTANCES_MACHINES_BUFFER: NonNegativeInt = Field( - default=0, - description="Constant reserve of drained ready machines for fast(er) usage," - "disabled when set to 0. Uses 1st machine defined in EC2_INSTANCES_ALLOWED_TYPES", - ) - EC2_INSTANCES_MAX_START_TIME: datetime.timedelta = Field( default=datetime.timedelta(minutes=3), description="Usual time taken an EC2 instance with the given AMI takes to be in 'running' mode", @@ -88,15 +84,6 @@ class EC2InstancesSettings(BaseCustomSettings): description="script(s) to run on EC2 instance startup (be careful!), each entry is run one after the other using '&&' operator", ) - @validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION") - @classmethod - def ensure_time_is_in_range(cls, value): - if value < datetime.timedelta(minutes=0): - value = datetime.timedelta(minutes=0) - elif value > datetime.timedelta(minutes=59): - value = datetime.timedelta(minutes=59) - return value - @validator("EC2_INSTANCES_ALLOWED_TYPES") @classmethod def check_valid_intance_names(cls, value): @@ -133,16 +120,15 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): # RUNTIME ----------------------------------------------------------- CLUSTERS_KEEPER_DEBUG: bool = Field( - default=False, description="Debug mode", env=["clusters_keeper_DEBUG", "DEBUG"] + default=False, description="Debug mode", env=["CLUSTERS_KEEPER_DEBUG", "DEBUG"] ) - CLUSTERS_KEEPER_LOGLEVEL: LogLevel = Field( - LogLevel.INFO, env=["clusters_keeper_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"] + LogLevel.INFO, env=["CLUSTERS_KEEPER_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"] ) CLUSTERS_KEEPER_LOG_FORMAT_LOCAL_DEV_ENABLED: bool = Field( default=False, env=[ - "clusters_keeper_LOG_FORMAT_LOCAL_DEV_ENABLED", + "CLUSTERS_KEEPER_LOG_FORMAT_LOCAL_DEV_ENABLED", "LOG_FORMAT_LOCAL_DEV_ENABLED", ], description="Enables local development log format. WARNING: make sure it is disabled if you want to have structured logs!", @@ -162,6 +148,31 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): auto_default_from_env=True ) + CLUSTERS_KEEPER_TASK_INTERVAL: datetime.timedelta = Field( + default=datetime.timedelta(seconds=60), + description="interval between each clusters clean check (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", + ) + + SERVICE_TRACKING_HEARTBEAT: datetime.timedelta = Field( + default=datetime.timedelta(seconds=60), + description="Service heartbeat interval (everytime a heartbeat is sent into RabbitMQ)", + ) + + CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION: NonNegativeInt = Field( + default=5, + description="Max number of missed heartbeats before a cluster is terminated", + ) + + CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG: str = Field( + default="master-github-latest", + description="defines the image tag to use for the computational backend", + ) + + CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD: SecretStr = Field( + default=SecretStr("my_secure_P1ssword"), + description="very secure password, should change soon", + ) + @cached_property def LOG_LEVEL(self): # noqa: N802 return self.CLUSTERS_KEEPER_LOGLEVEL diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py index c989fd14a7b..f3b4b71d3fc 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/models.py @@ -1,7 +1,13 @@ import datetime from dataclasses import dataclass +from enum import auto +from typing import TypeAlias -from pydantic import ByteSize, PositiveInt +from models_library.clusters import ClusterAuthentication, SimpleAuthentication +from models_library.users import UserID +from models_library.utils.enums import StrAutoEnum +from models_library.wallets import WalletID +from pydantic import AnyUrl, BaseModel, ByteSize, PositiveInt, SecretStr, parse_obj_as from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType @@ -13,6 +19,7 @@ class EC2InstanceType: InstancePrivateDNSName = str +EC2Tags: TypeAlias = dict[str, str] @dataclass(frozen=True) @@ -20,10 +27,52 @@ class EC2InstanceData: launch_time: datetime.datetime id: str # noqa: A003 aws_private_dns: InstancePrivateDNSName + aws_public_ip: str | None type: InstanceTypeType # noqa: A003 state: InstanceStateNameType + tags: EC2Tags -@dataclass(frozen=True) -class Cluster: - ... +class ClusterState(StrAutoEnum): + STARTED = auto() + RUNNING = auto() + STOPPED = auto() + + +def _convert_ec2_state_to_cluster_state( + ec2_state: InstanceStateNameType, +) -> ClusterState: + match ec2_state: + case "pending": + return ClusterState.STARTED # type: ignore + case "running": + return ClusterState.RUNNING # type: ignore + case _: + return ClusterState.STOPPED # type: ignore + + +class ClusterGet(BaseModel): + endpoint: AnyUrl + authentication: ClusterAuthentication + state: ClusterState + user_id: UserID + wallet_id: WalletID + gateway_ready: bool = False + + @classmethod + def from_ec2_instance_data( + cls, + instance: EC2InstanceData, + user_id: UserID, + wallet_id: WalletID, + gateway_password: SecretStr, + ) -> "ClusterGet": + return cls( + endpoint=parse_obj_as(AnyUrl, f"http://{instance.aws_public_ip}"), + authentication=SimpleAuthentication( + username=f"{user_id}", password=gateway_password + ), + state=_convert_ec2_state_to_cluster_state(instance.state), + user_id=user_id, + wallet_id=wallet_id, + ) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py new file mode 100644 index 00000000000..176395fa7a1 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py @@ -0,0 +1,93 @@ +import datetime +import logging +from typing import cast + +from fastapi import FastAPI +from models_library.users import UserID +from models_library.wallets import WalletID +from servicelib.logging_utils import log_context +from types_aiobotocore_ec2.literals import InstanceTypeType + +from ..core.errors import Ec2InstanceNotFoundError +from ..core.settings import get_application_settings +from ..models import EC2InstanceData +from ..utils.clusters import create_startup_script +from ..utils.ec2 import ( + HEARTBEAT_TAG_KEY, + all_created_ec2_instances_filter, + creation_ec2_tags, + ec2_instances_for_user_wallet_filter, +) +from .ec2 import get_ec2_client + +_logger = logging.getLogger(__name__) + + +async def create_cluster( + app: FastAPI, *, user_id: UserID, wallet_id: WalletID +) -> list[EC2InstanceData]: + ec2_client = get_ec2_client(app) + app_settings = get_application_settings(app) + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + return await ec2_client.start_aws_instance( + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, + instance_type=cast( + InstanceTypeType, + next( + iter( + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES + ) + ), + ), + tags=creation_ec2_tags(app_settings, user_id=user_id, wallet_id=wallet_id), + startup_script=create_startup_script(app_settings), + number_of_instances=1, + ) + + +async def get_all_clusters(app: FastAPI) -> list[EC2InstanceData]: + app_settings = get_application_settings(app) + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + return await get_ec2_client(app).get_instances( + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, + tags=all_created_ec2_instances_filter(), + state_names=["running"], + ) + + +async def get_cluster( + app: FastAPI, *, user_id: UserID, wallet_id: WalletID +) -> EC2InstanceData: + app_settings = get_application_settings(app) + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + if instances := await get_ec2_client(app).get_instances( + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, + tags=ec2_instances_for_user_wallet_filter(user_id, wallet_id), + ): + assert len(instances) == 1 # nosec + return instances[0] + raise Ec2InstanceNotFoundError + + +async def cluster_heartbeat( + app: FastAPI, *, user_id: UserID, wallet_id: WalletID +) -> None: + app_settings = get_application_settings(app) + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + instance = await get_cluster(app, user_id=user_id, wallet_id=wallet_id) + await set_instance_heartbeat(app, instance=instance) + + +async def set_instance_heartbeat(app: FastAPI, *, instance: EC2InstanceData) -> None: + with log_context( + _logger, logging.INFO, msg=f"set instance heartbeat for {instance.id}" + ): + ec2_client = get_ec2_client(app) + await ec2_client.set_instances_tags( + [instance], + tags={HEARTBEAT_TAG_KEY: f"{datetime.datetime.now(datetime.timezone.utc)}"}, + ) + + +async def delete_clusters(app: FastAPI, *, instances: list[EC2InstanceData]) -> None: + await get_ec2_client(app).terminate_instances(instances) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py new file mode 100644 index 00000000000..99b3d0dd74e --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py @@ -0,0 +1,79 @@ +import datetime +import logging + +import arrow +from fastapi import FastAPI + +from ..core.settings import get_application_settings +from ..models import EC2InstanceData +from ..modules.clusters import delete_clusters, get_all_clusters, set_instance_heartbeat +from ..utils.dask import get_gateway_authentication, get_gateway_url +from ..utils.ec2 import HEARTBEAT_TAG_KEY, get_user_id_from_tags +from .dask import is_gateway_busy, ping_gateway + +_logger = logging.getLogger(__name__) + + +def _get_instance_last_heartbeat(instance: EC2InstanceData) -> datetime.datetime: + if last_heartbeat := instance.tags.get(HEARTBEAT_TAG_KEY, None): + last_heartbeat_time: datetime.datetime = arrow.get(last_heartbeat).datetime + return last_heartbeat_time + return instance.launch_time + + +async def _find_terminateable_instances( + app: FastAPI, instances: list[EC2InstanceData] +) -> list[EC2InstanceData]: + app_settings = get_application_settings(app) + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + + # get the corresponding ec2 instance data + terminateable_instances: list[EC2InstanceData] = [] + + for instance in instances: + last_heartbeat = _get_instance_last_heartbeat(instance) + + elapsed_time_since_heartbeat = ( + datetime.datetime.now(datetime.timezone.utc) - last_heartbeat + ) + + if elapsed_time_since_heartbeat >= ( + app_settings.CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION + * app_settings.SERVICE_TRACKING_HEARTBEAT + ): + # let's terminate that one + terminateable_instances.append(instance) + + return terminateable_instances + + +async def check_clusters(app: FastAPI) -> None: + instances = await get_all_clusters(app) + app_settings = get_application_settings(app) + connected_intances = [ + instance + for instance in instances + if await ping_gateway( + url=get_gateway_url(instance), + password=app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD, + ) + ] + for instance in connected_intances: + is_busy = await is_gateway_busy( + url=get_gateway_url(instance), + gateway_auth=get_gateway_authentication( + user_id=get_user_id_from_tags(instance.tags), + password=app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD, + ), + ) + _logger.info( + "%s currently %s", + f"{instance.id=} for {instance.tags=}", + f"{'is running tasks' if is_busy else 'not doing anything!'}", + ) + if is_busy: + await set_instance_heartbeat(app, instance=instance) + if terminateable_instances := await _find_terminateable_instances( + app, connected_intances + ): + await delete_clusters(app, instances=terminateable_instances) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py new file mode 100644 index 00000000000..8bff1bf1624 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py @@ -0,0 +1,57 @@ +import json +import logging +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI +from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.redis_utils import exclusive + +from ..core.settings import ApplicationSettings +from ..modules.redis import get_redis_client +from .clusters_management_core import check_clusters + +_TASK_NAME = "Clusters-keeper EC2 instances management" + +logger = logging.getLogger(__name__) + + +def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _startup() -> None: + app_settings: ApplicationSettings = app.state.settings + + lock_key = f"{app.title}:clusters-management_lock" + lock_value = json.dumps({}) + app.state.clusters_cleaning_task = start_periodic_task( + exclusive(get_redis_client(app), lock_key=lock_key, lock_value=lock_value)( + check_clusters + ), + interval=app_settings.CLUSTERS_KEEPER_TASK_INTERVAL, + task_name=_TASK_NAME, + app=app, + ) + + return _startup + + +def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + await stop_periodic_task(app.state.clusters_cleaning_task) + + return _stop + + +def setup(app: FastAPI): + app_settings: ApplicationSettings = app.state.settings + if any( + s is None + for s in [ + app_settings.CLUSTERS_KEEPER_EC2_ACCESS, + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, + ] + ): + logger.warning( + "the clusters management background task is disabled by settings, nothing will happen!" + ) + return + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/dask.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/dask.py new file mode 100644 index 00000000000..bcb483b61f3 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/dask.py @@ -0,0 +1,74 @@ +import asyncio +import logging +from typing import Any, Coroutine, Final + +import dask_gateway +from aiohttp.client_exceptions import ClientError +from models_library.clusters import SimpleAuthentication +from pydantic import AnyUrl, SecretStr + +_logger = logging.getLogger(__name__) + +_PING_USERNAME: Final[str] = "osparc-cluster" + + +async def ping_gateway(*, url: AnyUrl, password: SecretStr) -> bool: + basic_auth = dask_gateway.BasicAuth( + username=_PING_USERNAME, password=password.get_secret_value() + ) + try: + async with dask_gateway.Gateway( + address=f"{url}", + auth=basic_auth, + asynchronous=True, + ) as gateway: + cluster_reports = await asyncio.wait_for(gateway.list_clusters(), timeout=5) + _logger.info("found %s clusters", len(cluster_reports)) + return True + except asyncio.TimeoutError: + _logger.debug("gateway ping timed-out, it is still starting...") + except ClientError: + # this could happen if the gateway is not properly started, but it should not last + # unless the wrong password is used. + _logger.info("dask-gateway is not reachable", exc_info=True) + + return False + + +async def _wrap_client_async_routine( + client_coroutine: Coroutine[Any, Any, Any] | Any | None +) -> Any: + """Dask async behavior does not go well with Pylance as it returns + a union of types. this wrapper makes both mypy and pylance happy""" + assert client_coroutine # nosec + return await client_coroutine + + +async def is_gateway_busy(*, url: AnyUrl, gateway_auth: SimpleAuthentication) -> bool: + basic_auth = dask_gateway.BasicAuth( + username=gateway_auth.username, + password=gateway_auth.password.get_secret_value(), + ) + async with dask_gateway.Gateway( + address=f"{url}", + auth=basic_auth, + asynchronous=True, + ) as gateway: + cluster_reports = await asyncio.wait_for(gateway.list_clusters(), timeout=5) + if not cluster_reports: + _logger.info("no cluster in gateway, nothing going on") + return False + assert len(cluster_reports) == 1 # nosec + async with gateway.connect( + cluster_reports[0].name, shutdown_on_close=False + ) as dask_cluster, dask_cluster.get_client() as client: + datasets_on_scheduler = await _wrap_client_async_routine( + client.list_datasets() + ) + _logger.info( + "cluster currently has %s datasets, it is %s", + len(datasets_on_scheduler), + "BUSY" if len(datasets_on_scheduler) > 0 else "NOT BUSY", + ) + currently_processing = await _wrap_client_async_routine(client.processing()) + return bool(datasets_on_scheduler or currently_processing) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py index 48db03f39ea..3b0ab4c96dc 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ec2.py @@ -24,7 +24,7 @@ Ec2TooManyInstancesError, ) from ..core.settings import EC2InstancesSettings, EC2Settings, get_application_settings -from ..models import EC2InstanceData, EC2InstanceType +from ..models import EC2InstanceData, EC2InstanceType, EC2Tags from ..utils.ec2 import compose_user_data logger = logging.getLogger(__name__) @@ -89,7 +89,7 @@ async def start_aws_instance( self, instance_settings: EC2InstancesSettings, instance_type: InstanceTypeType, - tags: dict[str, str], + tags: EC2Tags, startup_script: str, number_of_instances: int, ) -> list[EC2InstanceData]: @@ -99,7 +99,7 @@ async def start_aws_instance( msg=f"launching {number_of_instances} AWS instance(s) {instance_type} with {tags=}", ): # first check the max amount is not already reached - current_instances = await self.get_instances(instance_settings, tags) + current_instances = await self.get_instances(instance_settings, tags=tags) if ( len(current_instances) + number_of_instances > instance_settings.EC2_INSTANCES_MAX_INSTANCES @@ -115,7 +115,6 @@ async def start_aws_instance( InstanceType=instance_type, InstanceInitiatedShutdownBehavior="terminate", KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, - SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, TagSpecifications=[ { "ResourceType": "instance", @@ -126,7 +125,14 @@ async def start_aws_instance( } ], UserData=compose_user_data(startup_script), - SecurityGroupIds=instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, + NetworkInterfaces=[ + { + "AssociatePublicIpAddress": True, + "DeviceIndex": 0, + "SubnetId": instance_settings.EC2_INSTANCES_SUBNET_ID, + "Groups": instance_settings.EC2_INSTANCES_SECURITY_GROUP_IDS, + } + ], ) instance_ids = [i["InstanceId"] for i in instances["Instances"]] logger.info( @@ -147,8 +153,12 @@ async def start_aws_instance( launch_time=instance["LaunchTime"], id=instance["InstanceId"], aws_private_dns=instance["PrivateDnsName"], + aws_public_ip=instance["PublicIpAddress"] + if "PublicIpAddress" in instance + else None, type=instance["InstanceType"], state=instance["State"]["Name"], + tags={tag["Key"]: tag["Value"] for tag in instance["Tags"]}, ) for instance in instances["Reservations"][0]["Instances"] ] @@ -161,8 +171,8 @@ async def start_aws_instance( async def get_instances( self, instance_settings: EC2InstancesSettings, - tags: dict[str, str], *, + tags: EC2Tags, state_names: list[InstanceStateNameType] | None = None, ) -> list[EC2InstanceData]: # NOTE: be careful: Name=instance-state-name,Values=["pending", "running"] means pending OR running @@ -192,30 +202,62 @@ async def get_instances( assert "InstanceType" in instance # nosec assert "State" in instance # nosec assert "Name" in instance["State"] # nosec + assert "Tags" in instance # nosec all_instances.append( EC2InstanceData( launch_time=instance["LaunchTime"], id=instance["InstanceId"], aws_private_dns=instance["PrivateDnsName"], + aws_public_ip=instance["PublicIpAddress"] + if "PublicIpAddress" in instance + else None, type=instance["InstanceType"], state=instance["State"]["Name"], + tags={ + tag["Key"]: tag["Value"] + for tag in instance["Tags"] + if all(k in tag for k in ["Key", "Value"]) + }, ) ) - logger.debug("received: %s", f"{all_instances=}") + logger.debug( + "received: %s instances with %s", f"{len(all_instances)}", f"{state_names=}" + ) return all_instances async def terminate_instances(self, instance_datas: list[EC2InstanceData]) -> None: try: - await self.client.terminate_instances( - InstanceIds=[i.id for i in instance_datas] - ) + with log_context( + logger, + logging.INFO, + msg=f"terminating instances {[i.id for i in instance_datas]}", + ): + await self.client.terminate_instances( + InstanceIds=[i.id for i in instance_datas] + ) except botocore.exceptions.ClientError as exc: if ( exc.response.get("Error", {}).get("Code", "") == "InvalidInstanceID.NotFound" ): raise Ec2InstanceNotFoundError from exc - raise + raise # pragma: no cover + + async def set_instances_tags( + self, instances: list[EC2InstanceData], *, tags: EC2Tags + ) -> None: + with log_context( + logger, + logging.DEBUG, + msg=f"setting {tags=} on instances '[{[i.id for i in instances]}]'", + ): + await self.client.create_tags( + Resources=[i.id for i in instances], + Tags=[ + {"Key": tag_key, "Value": tag_value} + for tag_key, tag_value in tags.items() + ], + ) def setup(app: FastAPI) -> None: @@ -241,7 +283,7 @@ async def on_startup() -> None: with attempt: connected = await client.ping() if not connected: - raise Ec2NotConnectedError + raise Ec2NotConnectedError # pragma: no cover async def on_shutdown() -> None: if app.state.ec2_client: diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py index 20b83e5d55d..e1e4feb9b07 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/rabbitmq.py @@ -1,11 +1,17 @@ import contextlib import logging -from typing import cast +from typing import Final, cast from fastapi import FastAPI from models_library.rabbitmq_messages import RabbitMessageBase +from pydantic import parse_obj_as from servicelib.logging_utils import log_catch -from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive +from servicelib.rabbitmq import ( + RabbitMQClient, + RabbitMQRPCClient, + RPCNamespace, + wait_till_rabbitmq_responsive, +) from settings_library.rabbit import RabbitSettings from ..core.errors import ConfigurationError @@ -13,10 +19,15 @@ logger = logging.getLogger(__name__) +CLUSTERS_KEEPER_RPC_NAMESPACE: Final[RPCNamespace] = parse_obj_as( + RPCNamespace, "clusters-keeper" +) + def setup(app: FastAPI) -> None: async def on_startup() -> None: app.state.rabbitmq_client = None + app.state.rabbitmq_rpc_server = None settings: RabbitSettings | None = get_application_settings( app ).CLUSTERS_KEEPER_RABBITMQ @@ -24,13 +35,19 @@ async def on_startup() -> None: logger.warning("Rabbit MQ client is de-activated in the settings") return await wait_till_rabbitmq_responsive(settings.dsn) + # create the clients app.state.rabbitmq_client = RabbitMQClient( client_name="clusters_keeper", settings=settings ) + app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create( + client_name="clusters_keeper_rpc_server", settings=settings + ) async def on_shutdown() -> None: if app.state.rabbitmq_client: await app.state.rabbitmq_client.close() + if app.state.rabbitmq_rpc_server: + await app.state.rabbitmq_rpc_server.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) @@ -44,6 +61,18 @@ def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: return cast(RabbitMQClient, app.state.rabbitmq_client) +def is_rabbitmq_enabled(app: FastAPI) -> bool: + return app.state.rabbitmq_client is not None + + +def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: + if not app.state.rabbitmq_rpc_server: + raise ConfigurationError( + msg="RabbitMQ client for RPC is not available. Please check the configuration." + ) + return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server) + + async def post_message(app: FastAPI, message: RabbitMessageBase) -> None: with log_catch(logger, reraise=False), contextlib.suppress(ConfigurationError): # NOTE: if rabbitmq was not initialized the error does not need to flood the logs diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/remote_debug.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/remote_debug.py new file mode 100644 index 00000000000..318f7a11a02 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/remote_debug.py @@ -0,0 +1,26 @@ +""" Setup remote debugger with debugpy - a debugger for Python + https://github.com/microsoft/debugpy + +""" +import logging + +from fastapi import FastAPI + +_logger = logging.getLogger(__name__) +_REMOTE_DEBUGGING_PORT = 3000 + + +def setup_remote_debugging(app: FastAPI) -> None: + def on_startup() -> None: + try: + _logger.info("Attaching debugpy on %s...", _REMOTE_DEBUGGING_PORT) + + import debugpy + + debugpy.listen(("0.0.0.0", _REMOTE_DEBUGGING_PORT)) # nosec # noqa: S104 + + except ImportError as err: # pragma: no cover + msg = "Cannot enable remote debugging. Please install debugpy first" + raise RuntimeError(msg) from err + + app.add_event_handler("startup", on_startup) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/__init__.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py new file mode 100644 index 00000000000..5220d50a4ef --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/clusters.py @@ -0,0 +1,53 @@ +from fastapi import FastAPI +from models_library.users import UserID +from models_library.wallets import WalletID +from servicelib.rabbitmq import RPCRouter + +from ..core.errors import Ec2InstanceNotFoundError +from ..core.settings import get_application_settings +from ..models import ClusterGet, EC2InstanceData +from ..modules import clusters +from ..modules.dask import ping_gateway +from ..utils.dask import get_gateway_url + +router = RPCRouter() + + +@router.expose() +async def get_or_create_cluster( + app: FastAPI, *, user_id: UserID, wallet_id: WalletID +) -> ClusterGet: + """Get or create cluster for user_id and wallet_id + This function will create a new instance on AWS if needed or return the already running one. + It will also check that the underlying computational backend is up and running. + Calling several time will always return the same cluster. + """ + ec2_instance: EC2InstanceData | None = None + try: + ec2_instance = await clusters.get_cluster( + app, user_id=user_id, wallet_id=wallet_id + ) + await clusters.cluster_heartbeat(app, user_id=user_id, wallet_id=wallet_id) + except Ec2InstanceNotFoundError: + new_ec2_instances = await clusters.create_cluster( + app, user_id=user_id, wallet_id=wallet_id + ) + assert new_ec2_instances # nosec + assert len(new_ec2_instances) == 1 # nosec + ec2_instance = new_ec2_instances[0] + assert ec2_instance is not None # nosec + app_settings = get_application_settings(app) + cluster_get = ClusterGet.from_ec2_instance_data( + ec2_instance, + user_id, + wallet_id, + app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD, + ) + + if ec2_instance.state == "running": + cluster_get.gateway_ready = await ping_gateway( + url=get_gateway_url(ec2_instance), + password=app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD, + ) + + return cluster_get diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py new file mode 100644 index 00000000000..2b20838ba0e --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/rpc/rpc_routes.py @@ -0,0 +1,33 @@ +from collections.abc import Awaitable, Callable + +from fastapi import FastAPI + +from ..modules.rabbitmq import ( + CLUSTERS_KEEPER_RPC_NAMESPACE, + get_rabbitmq_rpc_client, + is_rabbitmq_enabled, +) +from .clusters import router as clusters_router + + +def on_app_startup(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _start() -> None: + if is_rabbitmq_enabled(app): + rpc_client = get_rabbitmq_rpc_client(app) + await rpc_client.register_router( + clusters_router, CLUSTERS_KEEPER_RPC_NAMESPACE, app + ) + + return _start + + +def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]: + async def _stop() -> None: + assert app # nosec + + return _stop + + +def setup_rpc_routes(app: FastAPI) -> None: + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py new file mode 100644 index 00000000000..856f8574377 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -0,0 +1,13 @@ +from ..core.settings import ApplicationSettings + + +def create_startup_script(app_settings: ApplicationSettings) -> str: + return "\n".join( + [ + "git clone --depth=1 https://github.com/ITISFoundation/osparc-simcore.git", + "cd osparc-simcore/services/osparc-gateway-server", + "make config", + f"echo 'c.Authenticator.password = \"{app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_GATEWAY_PASSWORD.get_secret_value()}\"' >> .osparc-dask-gateway-config.py", + f"DOCKER_IMAGE_TAG={app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG} make up", + ] + ) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/dask.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/dask.py new file mode 100644 index 00000000000..41ebc081fd3 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/dask.py @@ -0,0 +1,16 @@ +from models_library.clusters import SimpleAuthentication +from models_library.users import UserID +from pydantic import AnyUrl, SecretStr, parse_obj_as + +from ..models import EC2InstanceData + + +def get_gateway_url(ec2_instance: EC2InstanceData) -> AnyUrl: + url: AnyUrl = parse_obj_as(AnyUrl, f"http://{ec2_instance.aws_public_ip}:8000") + return url + + +def get_gateway_authentication( + user_id: UserID, password: SecretStr +) -> SimpleAuthentication: + return SimpleAuthentication(username=f"{user_id}", password=password) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py index 2514e1d0cc1..54b7beba7d9 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py @@ -1,10 +1,58 @@ from textwrap import dedent +from typing import Final + +from models_library.users import UserID +from models_library.wallets import WalletID + +from .._meta import VERSION +from ..core.settings import ApplicationSettings +from ..models import EC2Tags + +_APPLICATION_TAG_KEY_NAME: Final[str] = "io.simcore.clusters-keeper.version" +_DEFAULT_CLUSTERS_KEEPER_TAGS: Final[dict[str, str]] = { + _APPLICATION_TAG_KEY_NAME: f"{VERSION}" +} + +HEARTBEAT_TAG_KEY: Final[str] = "last_heartbeat" + + +def creation_ec2_tags( + app_settings: ApplicationSettings, *, user_id: UserID, wallet_id: WalletID +) -> EC2Tags: + assert app_settings.CLUSTERS_KEEPER_EC2_INSTANCES # nosec + return _DEFAULT_CLUSTERS_KEEPER_TAGS | { + # NOTE: this one gets special treatment in AWS GUI and is applied to the name of the instance + "Name": f"osparc-gateway-server-{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME}-user_id:{user_id}-wallet_id:{wallet_id}", + "user_id": f"{user_id}", + "wallet_id": f"{wallet_id}", + } + + +def all_created_ec2_instances_filter() -> EC2Tags: + return _DEFAULT_CLUSTERS_KEEPER_TAGS + + +def get_user_id_from_tags(tags: EC2Tags) -> UserID: + assert "user_id" in tags # nosec + return UserID(tags["user_id"]) + + +def ec2_instances_for_user_wallet_filter( + user_id: UserID, wallet_id: WalletID +) -> EC2Tags: + return ( + _DEFAULT_CLUSTERS_KEEPER_TAGS + | {"user_id": f"{user_id}"} + | {"wallet_id": f"{wallet_id}"} + ) def compose_user_data(bash_command: str) -> str: return dedent( f"""\ #!/bin/bash +echo "started user data bash script" {bash_command} +echo "completed user data bash script" """ ) diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index 4c599918188..d59dd53d689 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -10,7 +10,6 @@ import aiodocker import httpx -import psutil import pytest import requests import simcore_service_clusters_keeper @@ -20,7 +19,6 @@ from fakeredis.aioredis import FakeRedis from fastapi import FastAPI from moto.server import ThreadedMotoServer -from pydantic import ByteSize from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.utils_docker import get_localhost_ip from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict @@ -38,6 +36,7 @@ from types_aiobotocore_ec2.literals import InstanceTypeType pytest_plugins = [ + "pytest_simcore.dask_gateway", "pytest_simcore.docker_compose", "pytest_simcore.docker_swarm", "pytest_simcore.environment_configs", @@ -94,6 +93,26 @@ def app_environment( return mock_env_devel_environment | envs +@pytest.fixture +def disable_clusters_management_background_task( + mocker: MockerFixture, +) -> Iterator[None]: + start_background_task = mocker.patch( + "simcore_service_clusters_keeper.modules.clusters_management_task.start_periodic_task", + autospec=True, + ) + + stop_background_task = mocker.patch( + "simcore_service_clusters_keeper.modules.clusters_management_task.stop_periodic_task", + autospec=True, + ) + + yield + + start_background_task.assert_called_once() + stop_background_task.assert_called_once() + + @pytest.fixture def disabled_rabbitmq(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): monkeypatch.delenv("RABBIT_HOST") @@ -323,21 +342,6 @@ async def ec2_client( return clusters_keeper_ec2.client -@pytest.fixture -def host_cpu_count() -> int: - return psutil.cpu_count() - - -@pytest.fixture -def host_memory_total() -> ByteSize: - return ByteSize(psutil.virtual_memory().total) - - -@pytest.fixture -def aws_instance_private_dns() -> str: - return "ip-10-23-40-12.ec2.internal" - - @pytest.fixture def fake_ec2_instance_data(faker: Faker) -> Callable[..., EC2InstanceData]: def _creator(**overrides) -> EC2InstanceData: @@ -347,8 +351,10 @@ def _creator(**overrides) -> EC2InstanceData: "launch_time": faker.date_time(tzinfo=timezone.utc), "id": faker.uuid4(), "aws_private_dns": faker.name(), + "aws_public_ip": faker.ipv4_public(), "type": faker.pystr(), "state": faker.pystr(), + "tags": faker.pydict(allowed_types=(str,)), } | overrides ) diff --git a/services/clusters-keeper/tests/unit/test_core_settings.py b/services/clusters-keeper/tests/unit/test_core_settings.py index 380d5a04d16..b5283b1ad7d 100644 --- a/services/clusters-keeper/tests/unit/test_core_settings.py +++ b/services/clusters-keeper/tests/unit/test_core_settings.py @@ -2,9 +2,7 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import datetime -import pytest from pytest_simcore.helpers.utils_envs import EnvVarsDict from simcore_service_clusters_keeper.core.settings import ApplicationSettings @@ -15,24 +13,3 @@ def test_settings(app_environment: EnvVarsDict): assert settings.CLUSTERS_KEEPER_EC2_INSTANCES assert settings.CLUSTERS_KEEPER_RABBITMQ assert settings.CLUSTERS_KEEPER_REDIS - - -def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( # noqa: N802 - app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch -): - monkeypatch.setenv("EC2_INSTANCES_TIME_BEFORE_TERMINATION", "1:05:00") - settings = ApplicationSettings.create_from_envs() - assert settings.CLUSTERS_KEEPER_EC2_INSTANCES - assert settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - assert ( - settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION # noqa: SIM300 - == datetime.timedelta(minutes=59) - ) - - monkeypatch.setenv("EC2_INSTANCES_TIME_BEFORE_TERMINATION", "-1:05:00") - settings = ApplicationSettings.create_from_envs() - assert settings.CLUSTERS_KEEPER_EC2_INSTANCES - assert ( - settings.CLUSTERS_KEEPER_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION # noqa: SIM300 - == datetime.timedelta(minutes=0) - ) diff --git a/services/clusters-keeper/tests/unit/test_models.py b/services/clusters-keeper/tests/unit/test_models.py index adbdebd0f8a..2a2f833a884 100644 --- a/services/clusters-keeper/tests/unit/test_models.py +++ b/services/clusters-keeper/tests/unit/test_models.py @@ -1,3 +1,43 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable + +from typing import Callable + +import pytest +from faker import Faker +from pydantic import SecretStr +from simcore_service_clusters_keeper.models import ( + ClusterGet, + ClusterState, + EC2InstanceData, + _convert_ec2_state_to_cluster_state, +) +from types_aiobotocore_ec2.literals import InstanceStateNameType + + +@pytest.mark.parametrize( + "ec2_state, expected_cluster_state", + [ + ("pending", ClusterState.STARTED), + ("running", ClusterState.RUNNING), + ("shutting-down", ClusterState.STOPPED), + ("stopped", ClusterState.STOPPED), + ("stopping", ClusterState.STOPPED), + ("terminated", ClusterState.STOPPED), + ], +) +def test__convert_ec2_state_to_cluster_state( + ec2_state: InstanceStateNameType, expected_cluster_state: ClusterState +): + assert _convert_ec2_state_to_cluster_state(ec2_state) is expected_cluster_state + + +def test_from_ec2_instance_data( + fake_ec2_instance_data: Callable[..., EC2InstanceData], faker: Faker +): + instance_data = fake_ec2_instance_data() + cluster_get_instance = ClusterGet.from_ec2_instance_data( + instance_data, faker.pyint(), faker.pyint(), SecretStr(faker.password()) + ) + assert cluster_get_instance diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters.py b/services/clusters-keeper/tests/unit/test_modules_clusters.py new file mode 100644 index 00000000000..659100db13b --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_clusters.py @@ -0,0 +1,186 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import asyncio +import datetime + +import arrow +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.users import UserID +from models_library.wallets import WalletID +from parse import Result, search +from simcore_service_clusters_keeper.core.errors import Ec2InstanceNotFoundError +from simcore_service_clusters_keeper.models import EC2InstanceData +from simcore_service_clusters_keeper.modules.clusters import ( + cluster_heartbeat, + create_cluster, + delete_clusters, +) +from simcore_service_clusters_keeper.utils.ec2 import HEARTBEAT_TAG_KEY +from types_aiobotocore_ec2 import EC2Client + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def wallet_id(faker: Faker) -> WalletID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def _base_configuration( + docker_swarm: None, + disabled_rabbitmq: None, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + aws_allowed_ec2_instance_type_names: list[str], + mocked_redis_server: None, +) -> None: + ... + + +async def _assert_cluster_instance_created( + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, +) -> None: + instances = await ec2_client.describe_instances() + assert len(instances["Reservations"]) == 1 + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == 1 + assert "Tags" in instances["Reservations"][0]["Instances"][0] + instance_ec2_tags = instances["Reservations"][0]["Instances"][0]["Tags"] + assert len(instance_ec2_tags) == 4 + assert all("Key" in x for x in instance_ec2_tags) + assert all("Value" in x for x in instance_ec2_tags) + + assert "Key" in instances["Reservations"][0]["Instances"][0]["Tags"][0] + assert ( + instances["Reservations"][0]["Instances"][0]["Tags"][0]["Key"] + == "io.simcore.clusters-keeper.version" + ) + assert "Key" in instances["Reservations"][0]["Instances"][0]["Tags"][1] + assert instances["Reservations"][0]["Instances"][0]["Tags"][1]["Key"] == "Name" + assert "Value" in instances["Reservations"][0]["Instances"][0]["Tags"][1] + instance_name = instances["Reservations"][0]["Instances"][0]["Tags"][1]["Value"] + + parse_result = search("user_id:{user_id:d}-wallet_id:{wallet_id:d}", instance_name) + assert isinstance(parse_result, Result) + assert parse_result["user_id"] == user_id + assert parse_result["wallet_id"] == wallet_id + + +async def _create_cluster( + app: FastAPI, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, +) -> list[EC2InstanceData]: + created_clusters = await create_cluster(app, user_id=user_id, wallet_id=wallet_id) + assert len(created_clusters) == 1 + # check we do have a new machine in AWS + await _assert_cluster_instance_created(ec2_client, user_id, wallet_id) + return created_clusters + + +async def test_create_cluster( + _base_configuration: None, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + initialized_app: FastAPI, +): + await _create_cluster(initialized_app, ec2_client, user_id, wallet_id) + + +async def _assert_cluster_heartbeat_on_instance( + ec2_client: EC2Client, +) -> datetime.datetime: + instances = await ec2_client.describe_instances() + assert len(instances["Reservations"]) == 1 + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == 1 + assert "Tags" in instances["Reservations"][0]["Instances"][0] + instance_tags = instances["Reservations"][0]["Instances"][0]["Tags"] + assert len(instance_tags) == 5 + assert all("Key" in x for x in instance_tags) + list_of_heartbeats = list( + filter(lambda x: x["Key"] == HEARTBEAT_TAG_KEY, instance_tags) # type:ignore + ) + assert len(list_of_heartbeats) == 1 + assert "Value" in list_of_heartbeats[0] + this_heartbeat_time = arrow.get(list_of_heartbeats[0]["Value"]).datetime + assert this_heartbeat_time + return this_heartbeat_time + + +async def test_cluster_heartbeat( + _base_configuration: None, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + initialized_app: FastAPI, +): + await _create_cluster(initialized_app, ec2_client, user_id, wallet_id) + + await cluster_heartbeat(initialized_app, user_id=user_id, wallet_id=wallet_id) + first_heartbeat_time = await _assert_cluster_heartbeat_on_instance(ec2_client) + + await asyncio.sleep(1) + + await cluster_heartbeat(initialized_app, user_id=user_id, wallet_id=wallet_id) + next_heartbeat_time = await _assert_cluster_heartbeat_on_instance(ec2_client) + + assert next_heartbeat_time > first_heartbeat_time + + +async def test_cluster_heartbeat_on_non_existing_cluster_raises( + _base_configuration: None, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + initialized_app: FastAPI, +): + with pytest.raises(Ec2InstanceNotFoundError): + await cluster_heartbeat(initialized_app, user_id=user_id, wallet_id=wallet_id) + + +async def _assert_all_clusters_terminated( + ec2_client: EC2Client, +) -> None: + described_instances = await ec2_client.describe_instances() + if "Reservations" not in described_instances: + print("no reservations on AWS. ok.") + return + + for reservation in described_instances["Reservations"]: + if "Instances" not in reservation: + print("no instance in reservation on AWS, weird but ok.") + continue + + for instance in reservation["Instances"]: + assert "State" in instance + assert "Name" in instance["State"] + assert instance["State"]["Name"] == "terminated" + + +async def test_delete_cluster( + _base_configuration: None, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + initialized_app: FastAPI, +): + created_instances = await _create_cluster( + initialized_app, ec2_client, user_id, wallet_id + ) + await delete_clusters(initialized_app, instances=created_instances) + await _assert_all_clusters_terminated(ec2_client) diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py new file mode 100644 index 00000000000..d8ca55f9438 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py @@ -0,0 +1,156 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import asyncio +from typing import Final +from unittest.mock import MagicMock + +import pytest +from attr import dataclass +from faker import Faker +from fastapi import FastAPI +from models_library.users import UserID +from models_library.wallets import WalletID +from pytest_mock import MockerFixture +from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import setenvs_from_dict +from simcore_service_clusters_keeper.models import EC2InstanceData +from simcore_service_clusters_keeper.modules.clusters import ( + cluster_heartbeat, + create_cluster, +) +from simcore_service_clusters_keeper.modules.clusters_management_core import ( + check_clusters, +) +from types_aiobotocore_ec2 import EC2Client +from types_aiobotocore_ec2.literals import InstanceStateNameType + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def wallet_id(faker: Faker) -> WalletID: + return faker.pyint(min_value=1) + + +_FAST_TIME_BEFORE_TERMINATION_SECONDS: Final[int] = 10 + + +@pytest.fixture +def app_environment( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + # fast interval + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + "CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION": "1", + "SERVICE_TRACKING_HEARTBEAT": f"{_FAST_TIME_BEFORE_TERMINATION_SECONDS}", + }, + ) + + +@pytest.fixture +def _base_configuration( + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + aws_allowed_ec2_instance_type_names: list[str], + disabled_rabbitmq: None, + mocked_redis_server: None, +) -> None: + ... + + +async def _assert_cluster_exist_and_state( + ec2_client: EC2Client, + *, + instances: list[EC2InstanceData], + state: InstanceStateNameType, +) -> None: + described_instances = await ec2_client.describe_instances( + InstanceIds=[i.id for i in instances] + ) + assert described_instances + assert "Reservations" in described_instances + + for reservation in described_instances["Reservations"]: + assert "Instances" in reservation + + for instance in reservation["Instances"]: + assert "State" in instance + assert "Name" in instance["State"] + assert instance["State"]["Name"] == state + + +@dataclass +class MockedDaskModule: + ping_gateway: MagicMock + is_gateway_busy: MagicMock + + +@pytest.fixture +def mocked_dask_ping_gateway(mocker: MockerFixture) -> MockedDaskModule: + return MockedDaskModule( + ping_gateway=mocker.patch( + "simcore_service_clusters_keeper.modules.clusters_management_core.ping_gateway", + autospec=True, + return_value=True, + ), + is_gateway_busy=mocker.patch( + "simcore_service_clusters_keeper.modules.clusters_management_core.is_gateway_busy", + autospec=True, + return_value=True, + ), + ) + + +async def test_cluster_management_core_properly_unused_instances( + disable_clusters_management_background_task: None, + _base_configuration: None, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + initialized_app: FastAPI, + mocked_dask_ping_gateway: MockedDaskModule, +): + created_clusters = await create_cluster( + initialized_app, user_id=user_id, wallet_id=wallet_id + ) + assert len(created_clusters) == 1 + + # running the cluster management task shall not remove anything + await check_clusters(initialized_app) + await _assert_cluster_exist_and_state( + ec2_client, instances=created_clusters, state="running" + ) + mocked_dask_ping_gateway.ping_gateway.assert_called_once() + mocked_dask_ping_gateway.ping_gateway.reset_mock() + mocked_dask_ping_gateway.is_gateway_busy.assert_called_once() + mocked_dask_ping_gateway.is_gateway_busy.reset_mock() + + # running the cluster management task after the heartbeat came in shall not remove anything + await asyncio.sleep(_FAST_TIME_BEFORE_TERMINATION_SECONDS + 1) + await cluster_heartbeat(initialized_app, user_id=user_id, wallet_id=wallet_id) + await check_clusters(initialized_app) + await _assert_cluster_exist_and_state( + ec2_client, instances=created_clusters, state="running" + ) + mocked_dask_ping_gateway.ping_gateway.assert_called_once() + mocked_dask_ping_gateway.ping_gateway.reset_mock() + mocked_dask_ping_gateway.is_gateway_busy.assert_called_once() + mocked_dask_ping_gateway.is_gateway_busy.reset_mock() + + # after waiting the termination time, running the task shall remove the cluster + await asyncio.sleep(_FAST_TIME_BEFORE_TERMINATION_SECONDS + 1) + await check_clusters(initialized_app) + await _assert_cluster_exist_and_state( + ec2_client, instances=created_clusters, state="terminated" + ) + mocked_dask_ping_gateway.ping_gateway.assert_called_once() + mocked_dask_ping_gateway.is_gateway_busy.assert_called_once() diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py b/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py new file mode 100644 index 00000000000..18d440aa541 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py @@ -0,0 +1,51 @@ +# pylint: disable=no-value-for-parameter +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import asyncio +from unittest import mock + +import pytest +from fastapi import FastAPI +from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from simcore_service_clusters_keeper.core.settings import ApplicationSettings + +_FAST_POLL_INTERVAL = 1 + + +@pytest.fixture +def app_environment( + app_environment: EnvVarsDict, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, {"CLUSTERS_KEEPER_TASK_INTERVAL": f"{_FAST_POLL_INTERVAL}"} + ) + + +@pytest.fixture +def mock_background_task(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_clusters_keeper.modules.clusters_management_task.check_clusters", + autospec=True, + ) + + +async def test_clusters_management_task_created_and_deleted( + disabled_rabbitmq: None, + mocked_aws_server_envs: None, + mocked_redis_server: None, + mock_background_task: mock.Mock, + initialized_app: FastAPI, + app_settings: ApplicationSettings, +): + assert ( + app_settings.CLUSTERS_KEEPER_TASK_INTERVAL.total_seconds() + == _FAST_POLL_INTERVAL + ) + assert hasattr(initialized_app.state, "clusters_cleaning_task") + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + mock_background_task.assert_called() diff --git a/services/clusters-keeper/tests/unit/test_modules_dask.py b/services/clusters-keeper/tests/unit/test_modules_dask.py new file mode 100644 index 00000000000..72052786ce4 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_dask.py @@ -0,0 +1,121 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +import time + +from dask_gateway import GatewayCluster, auth +from distributed import Client +from faker import Faker +from models_library.clusters import SimpleAuthentication +from pydantic import AnyUrl, SecretStr, parse_obj_as +from pytest_simcore.dask_gateway import DaskGatewayServer +from simcore_service_clusters_keeper.modules.dask import is_gateway_busy, ping_gateway +from tenacity import retry +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + + +async def test_ping_non_existing_gateway(faker: Faker): + assert ( + await ping_gateway( + url=parse_obj_as(AnyUrl, faker.url()), + password=SecretStr(faker.password()), + ) + is False + ) + + +async def test_ping_gateway(local_dask_gateway_server: DaskGatewayServer): + assert ( + await ping_gateway( + url=parse_obj_as(AnyUrl, local_dask_gateway_server.address), + password=SecretStr(local_dask_gateway_server.password), + ) + is True + ) + + +@retry( + wait=wait_fixed(1), + stop=stop_after_delay(30), + retry=retry_if_exception_type(AssertionError), +) +async def _assert_gateway_is_busy( + url: AnyUrl, user: str, password: SecretStr, *, busy: bool +): + print(f"--> waiting for gateway to become {busy=}") + assert ( + await is_gateway_busy( + url=url, + gateway_auth=SimpleAuthentication( + username=user, + password=password, + ), + ) + is busy + ) + print(f"gateway is now {busy=}") + + +async def test_is_gateway_busy_with_no_cluster( + local_dask_gateway_server: DaskGatewayServer, faker: Faker +): + assert ( + await is_gateway_busy( + url=parse_obj_as(AnyUrl, local_dask_gateway_server.address), + gateway_auth=SimpleAuthentication( + username=faker.user_name(), + password=SecretStr(local_dask_gateway_server.password), + ), + ) + is False + ) + + +async def test_is_gateway_busy( + local_dask_gateway_server: DaskGatewayServer, + dask_gateway_cluster: GatewayCluster, + dask_gateway_cluster_client: Client, +): + # nothing runs right now + assert dask_gateway_cluster.gateway.auth + assert isinstance(dask_gateway_cluster.gateway.auth, auth.BasicAuth) + assert ( + await is_gateway_busy( + url=parse_obj_as(AnyUrl, local_dask_gateway_server.address), + gateway_auth=SimpleAuthentication( + username=dask_gateway_cluster.gateway.auth.username, + password=SecretStr(local_dask_gateway_server.password), + ), + ) + is False + ) + await dask_gateway_cluster.scale(1) + assert ( + await is_gateway_busy( + url=parse_obj_as(AnyUrl, local_dask_gateway_server.address), + gateway_auth=SimpleAuthentication( + username=dask_gateway_cluster.gateway.auth.username, + password=SecretStr(local_dask_gateway_server.password), + ), + ) + is False + ) + _SLEEP_TIME = 5 + + def _some_long_running_fct(sleep_time: int) -> str: + time.sleep(sleep_time) + return f"I slept for {sleep_time} seconds" + + future = dask_gateway_cluster_client.submit(_some_long_running_fct, _SLEEP_TIME) + + await _assert_gateway_is_busy( + url=parse_obj_as(AnyUrl, local_dask_gateway_server.address), + user=f"{dask_gateway_cluster.gateway.auth.username}", + password=SecretStr(local_dask_gateway_server.password), + busy=True, + ) + + result = await future.result(timeout=2 * _SLEEP_TIME) # type: ignore + assert "seconds" in result diff --git a/services/clusters-keeper/tests/unit/test_modules_ec2.py b/services/clusters-keeper/tests/unit/test_modules_ec2.py index ab701b154d3..4e1b02b0860 100644 --- a/services/clusters-keeper/tests/unit/test_modules_ec2.py +++ b/services/clusters-keeper/tests/unit/test_modules_ec2.py @@ -246,7 +246,7 @@ async def test_get_instances( assert not all_instances["Reservations"] assert ( await clusters_keeper_ec2.get_instances( - app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, {} + app_settings.CLUSTERS_KEEPER_EC2_INSTANCES, tags={} ) == [] ) diff --git a/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py b/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py index ea838a94184..2dec9f7d707 100644 --- a/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py +++ b/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py @@ -3,8 +3,8 @@ # pylint:disable=redefined-outer-name import asyncio -from collections.abc import Callable, Mapping -from typing import Any +import contextlib +from collections.abc import AsyncIterator, Callable import aiodocker import pytest @@ -17,9 +17,10 @@ from simcore_service_clusters_keeper.core.errors import ConfigurationError from simcore_service_clusters_keeper.modules.rabbitmq import ( get_rabbitmq_client, + get_rabbitmq_rpc_client, + is_rabbitmq_enabled, post_message, ) -from tenacity import retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay @@ -68,8 +69,12 @@ def test_rabbitmq_does_not_initialize_if_deactivated( ): assert hasattr(initialized_app.state, "rabbitmq_client") assert initialized_app.state.rabbitmq_client is None + assert initialized_app.state.rabbitmq_rpc_server is None with pytest.raises(ConfigurationError): get_rabbitmq_client(initialized_app) + with pytest.raises(ConfigurationError): + get_rabbitmq_rpc_client(initialized_app) + assert is_rabbitmq_enabled(initialized_app) is False def test_rabbitmq_initializes( @@ -80,7 +85,13 @@ def test_rabbitmq_initializes( ): assert hasattr(initialized_app.state, "rabbitmq_client") assert initialized_app.state.rabbitmq_client is not None + assert initialized_app.state.rabbitmq_rpc_server is not None assert get_rabbitmq_client(initialized_app) == initialized_app.state.rabbitmq_client + assert ( + get_rabbitmq_rpc_client(initialized_app) + == initialized_app.state.rabbitmq_rpc_server + ) + assert is_rabbitmq_enabled(initialized_app) is True async def test_post_message( @@ -122,35 +133,29 @@ async def test_post_message_with_disabled_rabbit_does_not_raise( await post_message(initialized_app, message=rabbit_message) -async def _switch_off_rabbit_mq_instance(async_docker_client: aiodocker.Docker) -> None: - # remove the rabbit MQ instance - rabbit_services = [ - s - for s in await async_docker_client.services.list() - if "rabbit" in s["Spec"]["Name"] - ] - await asyncio.gather( - *(async_docker_client.services.delete(s["ID"]) for s in rabbit_services) +@contextlib.asynccontextmanager +async def paused_container( + async_docker_client: aiodocker.Docker, container_name: str +) -> AsyncIterator[None]: + containers = await async_docker_client.containers.list( + filters={"name": [container_name]} ) + await asyncio.gather(*(c.pause() for c in containers)) + # refresh + container_attrs = await asyncio.gather(*(c.show() for c in containers)) + for container_status in container_attrs: + assert container_status["State"]["Status"] == "paused" - @retry(**_TENACITY_RETRY_PARAMS) - async def _check_service_task_gone(service: Mapping[str, Any]) -> None: - print( - f"--> checking if service {service['ID']}:{service['Spec']['Name']} is really gone..." - ) - list_of_tasks = await async_docker_client.containers.list( - all=True, - filters={ - "label": [f"com.docker.swarm.service.id={service['ID']}"], - }, - ) - assert not list_of_tasks - print(f"<-- service {service['ID']}:{service['Spec']['Name']} is gone.") + yield - await asyncio.gather(*(_check_service_task_gone(s) for s in rabbit_services)) + await asyncio.gather(*(c.unpause() for c in containers)) + # refresh + container_attrs = await asyncio.gather(*(c.show() for c in containers)) + for container_status in container_attrs: + assert container_status["State"]["Status"] == "running" -async def test_post_message_when_rabbit_disconnected( +async def test_post_message_when_rabbit_disconnected_does_not_raise( enabled_rabbitmq: RabbitSettings, disabled_ec2: None, mocked_redis_server: None, @@ -158,7 +163,9 @@ async def test_post_message_when_rabbit_disconnected( rabbit_log_message: LoggerRabbitMessage, async_docker_client: aiodocker.Docker, ): - await _switch_off_rabbit_mq_instance(async_docker_client) - - # now posting should not raise out + # NOTE: if the connection is not initialized before pausing the container, then + # this test hangs forever!!! This needs investigations! await post_message(initialized_app, message=rabbit_log_message) + async with paused_container(async_docker_client, "rabbit"): + # now posting should not raise out + await post_message(initialized_app, message=rabbit_log_message) diff --git a/services/clusters-keeper/tests/unit/test_modules_remote_debug.py b/services/clusters-keeper/tests/unit/test_modules_remote_debug.py new file mode 100644 index 00000000000..5b1f167f9f4 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_remote_debug.py @@ -0,0 +1,29 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +import pytest +from fastapi import FastAPI +from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import setenvs_from_dict + + +@pytest.fixture +def app_environment( + app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch +) -> EnvVarsDict: + return app_environment | setenvs_from_dict( + monkeypatch, + envs={ + "SC_BOOT_MODE": "debug-ptvsd", + }, + ) + + +def test_application_with_debug_enabled( + disabled_rabbitmq: None, + disabled_ec2: None, + mocked_redis_server: None, + initialized_app: FastAPI, +): + ... diff --git a/services/clusters-keeper/tests/unit/test_rpc_clusters.py b/services/clusters-keeper/tests/unit/test_rpc_clusters.py new file mode 100644 index 00000000000..365d8d2e099 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_rpc_clusters.py @@ -0,0 +1,175 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import datetime +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Final +from unittest.mock import MagicMock + +import arrow +import pytest +from faker import Faker +from fastapi import FastAPI +from models_library.users import UserID +from models_library.wallets import WalletID +from parse import Result, search +from pydantic import parse_obj_as +from pytest_mock.plugin import MockerFixture +from servicelib.rabbitmq import RabbitMQRPCClient, RPCMethodName, RPCNamespace +from simcore_service_clusters_keeper.models import ClusterGet +from simcore_service_clusters_keeper.utils.ec2 import HEARTBEAT_TAG_KEY +from types_aiobotocore_ec2 import EC2Client + +pytest_simcore_core_services_selection = [ + "rabbit", +] + +pytest_simcore_ops_services_selection = [] + + +@pytest.fixture +async def clusters_keeper_rabbitmq_rpc_client( + rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]] +) -> RabbitMQRPCClient: + rpc_client = await rabbitmq_rpc_client("pytest_clusters_keeper_rpc_client") + assert rpc_client + return rpc_client + + +CLUSTERS_KEEPER_NAMESPACE: Final[RPCNamespace] = parse_obj_as( + RPCNamespace, "clusters-keeper" +) + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def wallet_id(faker: Faker) -> WalletID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def _base_configuration( + docker_swarm: None, + enabled_rabbitmq: None, + aws_subnet_id: str, + aws_security_group_id: str, + aws_ami_id: str, + aws_allowed_ec2_instance_type_names: list[str], + mocked_redis_server: None, + initialized_app: FastAPI, +) -> None: + ... + + +async def _assert_cluster_instance_created( + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, +) -> None: + instances = await ec2_client.describe_instances() + assert len(instances["Reservations"]) == 1 + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == 1 + assert "Tags" in instances["Reservations"][0]["Instances"][0] + instance_ec2_tags = instances["Reservations"][0]["Instances"][0]["Tags"] + assert len(instance_ec2_tags) == 4 + assert all("Key" in x for x in instance_ec2_tags) + assert all("Value" in x for x in instance_ec2_tags) + + assert "Key" in instances["Reservations"][0]["Instances"][0]["Tags"][0] + assert ( + instances["Reservations"][0]["Instances"][0]["Tags"][0]["Key"] + == "io.simcore.clusters-keeper.version" + ) + assert "Key" in instances["Reservations"][0]["Instances"][0]["Tags"][1] + assert instances["Reservations"][0]["Instances"][0]["Tags"][1]["Key"] == "Name" + assert "Value" in instances["Reservations"][0]["Instances"][0]["Tags"][1] + instance_name = instances["Reservations"][0]["Instances"][0]["Tags"][1]["Value"] + + parse_result = search("user_id:{user_id:d}-wallet_id:{wallet_id:d}", instance_name) + assert isinstance(parse_result, Result) + assert parse_result["user_id"] == user_id + assert parse_result["wallet_id"] == wallet_id + + +async def _assert_cluster_heartbeat_on_instance( + ec2_client: EC2Client, +) -> datetime.datetime: + instances = await ec2_client.describe_instances() + assert len(instances["Reservations"]) == 1 + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == 1 + assert "Tags" in instances["Reservations"][0]["Instances"][0] + instance_tags = instances["Reservations"][0]["Instances"][0]["Tags"] + assert len(instance_tags) == 5 + assert all("Key" in x for x in instance_tags) + list_of_heartbeats = list( + filter(lambda x: x["Key"] == HEARTBEAT_TAG_KEY, instance_tags) # type:ignore + ) + assert len(list_of_heartbeats) == 1 + assert "Value" in list_of_heartbeats[0] + this_heartbeat_time = arrow.get(list_of_heartbeats[0]["Value"]).datetime + assert this_heartbeat_time + return this_heartbeat_time + + +@dataclass +class MockedDaskModule: + ping_gateway: MagicMock + + +@pytest.fixture +def mocked_dask_ping_gateway(mocker: MockerFixture) -> MockedDaskModule: + return MockedDaskModule( + ping_gateway=mocker.patch( + "simcore_service_clusters_keeper.rpc.clusters.ping_gateway", + autospec=True, + return_value=True, + ), + ) + + +async def test_get_or_create_cluster( + _base_configuration: None, + clusters_keeper_rabbitmq_rpc_client: RabbitMQRPCClient, + ec2_client: EC2Client, + user_id: UserID, + wallet_id: WalletID, + mocked_dask_ping_gateway: MockedDaskModule, +): + # send rabbitmq rpc to create_cluster + rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( + CLUSTERS_KEEPER_NAMESPACE, + RPCMethodName("get_or_create_cluster"), + user_id=user_id, + wallet_id=wallet_id, + ) + assert rpc_response + created_cluster = ClusterGet.parse_raw(rpc_response) + # check we do have a new machine in AWS + await _assert_cluster_instance_created(ec2_client, user_id, wallet_id) + # it is called once as moto server creates instances instantly + mocked_dask_ping_gateway.ping_gateway.assert_called_once() + mocked_dask_ping_gateway.ping_gateway.reset_mock() + + # calling it again returns the existing cluster + rpc_response = await clusters_keeper_rabbitmq_rpc_client.request( + CLUSTERS_KEEPER_NAMESPACE, + RPCMethodName("get_or_create_cluster"), + user_id=user_id, + wallet_id=wallet_id, + ) + assert rpc_response + returned_cluster = ClusterGet.parse_raw(rpc_response) + # check we still have only 1 instance + await _assert_cluster_heartbeat_on_instance(ec2_client) + mocked_dask_ping_gateway.ping_gateway.assert_called_once() + + assert created_cluster == returned_cluster diff --git a/services/clusters-keeper/tests/unit/test_utils_ec2.py b/services/clusters-keeper/tests/unit/test_utils_ec2.py new file mode 100644 index 00000000000..2ae89c4f268 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_utils_ec2.py @@ -0,0 +1,64 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +import pytest +from faker import Faker +from models_library.users import UserID +from models_library.wallets import WalletID +from simcore_service_clusters_keeper.core.settings import ApplicationSettings +from simcore_service_clusters_keeper.utils.ec2 import ( + _APPLICATION_TAG_KEY_NAME, + all_created_ec2_instances_filter, + compose_user_data, + creation_ec2_tags, +) + + +@pytest.fixture +def user_id(faker: Faker) -> UserID: + return faker.pyint(min_value=1) + + +@pytest.fixture +def wallet_id(faker: Faker) -> WalletID: + return faker.pyint(min_value=1) + + +def test_creation_ec2_tags( + mocked_aws_server_envs: None, + disabled_rabbitmq: None, + mocked_redis_server: None, + app_settings: ApplicationSettings, + user_id: UserID, + wallet_id: WalletID, +): + received_tags = creation_ec2_tags( + app_settings, user_id=user_id, wallet_id=wallet_id + ) + assert received_tags + EXPECTED_TAG_KEY_NAMES = [_APPLICATION_TAG_KEY_NAME, "Name", "user_id", "wallet_id"] + assert all( + tag_key_name in received_tags for tag_key_name in EXPECTED_TAG_KEY_NAMES + ), f"missing tag key names in {received_tags.keys()}, expected {EXPECTED_TAG_KEY_NAMES}" + assert all( + tag_key_name in EXPECTED_TAG_KEY_NAMES for tag_key_name in received_tags + ), f"non expected tag key names in {received_tags.keys()}, expected {EXPECTED_TAG_KEY_NAMES}" + + +def test_all_created_ec2_instances_filter(): + received_filter = all_created_ec2_instances_filter() + assert len(received_filter) == 1 + assert _APPLICATION_TAG_KEY_NAME in received_filter + + +@pytest.fixture +def bash_command(faker: Faker) -> str: + return faker.pystr() + + +def test_compose_user_data(bash_command: str): + received_user_data = compose_user_data(bash_command) + assert received_user_data + assert received_user_data.startswith("#!/bin/bash\n") + assert bash_command in received_user_data diff --git a/services/director-v2/tests/conftest.py b/services/director-v2/tests/conftest.py index 27469857522..0c5e341035a 100644 --- a/services/director-v2/tests/conftest.py +++ b/services/director-v2/tests/conftest.py @@ -26,6 +26,7 @@ pytest_plugins = [ "pytest_simcore.db_entries_mocks", + "pytest_simcore.dask_gateway", "pytest_simcore.docker_compose", "pytest_simcore.docker_registry", "pytest_simcore.docker_swarm", diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 0f03835a093..261b90a0373 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -5,25 +5,14 @@ import logging import random import urllib.parse -from collections.abc import ( - AsyncIterable, - AsyncIterator, - Callable, - Iterable, - Iterator, - Mapping, -) +from collections.abc import AsyncIterable, Callable, Iterable, Iterator, Mapping from typing import Any from unittest import mock import aiodocker import pytest import respx -import traitlets.config -from _dask_helpers import DaskGatewayServer from dask.distributed import Scheduler, Worker -from dask_gateway_server.app import DaskGateway -from dask_gateway_server.backends.local import UnsafeLocalBackend from distributed.deploy.spec import SpecCluster from faker import Faker from fastapi import FastAPI @@ -40,7 +29,6 @@ from models_library.services import RunID, ServiceKey, ServiceKeyVersion, ServiceVersion from models_library.services_enums import ServiceState from pydantic import parse_obj_as -from pytest import LogCaptureFixture, MonkeyPatch from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.s3 import S3Settings @@ -181,7 +169,7 @@ def cluster_id() -> ClusterID: @pytest.fixture async def dask_spec_local_cluster( - monkeypatch: MonkeyPatch, + monkeypatch: pytest.MonkeyPatch, unused_tcp_port_factory: Callable, ) -> AsyncIterable[SpecCluster]: # in this mode we can precisely create a specific cluster @@ -234,57 +222,6 @@ async def dask_spec_local_cluster( yield cluster -@pytest.fixture -def local_dask_gateway_server_config( - unused_tcp_port_factory: Callable, -) -> traitlets.config.Config: - c = traitlets.config.Config() - assert isinstance(c.DaskGateway, traitlets.config.Config) - assert isinstance(c.ClusterConfig, traitlets.config.Config) - assert isinstance(c.Proxy, traitlets.config.Config) - assert isinstance(c.SimpleAuthenticator, traitlets.config.Config) - c.DaskGateway.backend_class = UnsafeLocalBackend - c.DaskGateway.address = f"127.0.0.1:{unused_tcp_port_factory()}" - c.Proxy.address = f"127.0.0.1:{unused_tcp_port_factory()}" - c.DaskGateway.authenticator_class = "dask_gateway_server.auth.SimpleAuthenticator" - c.SimpleAuthenticator.password = "qweqwe" - c.ClusterConfig.worker_cmd = [ - "dask-worker", - "--resources", - f"CPU=12,GPU=1,RAM={16e9}", - ] - # NOTE: This must be set such that the local unsafe backend creates a worker with enough cores/memory - c.ClusterConfig.worker_cores = 12 - c.ClusterConfig.worker_memory = "16G" - c.ClusterConfig.cluster_max_workers = 3 - - c.DaskGateway.log_level = "DEBUG" - return c - - -@pytest.fixture -async def local_dask_gateway_server( - local_dask_gateway_server_config: traitlets.config.Config, -) -> AsyncIterator[DaskGatewayServer]: - print("--> creating local dask gateway server") - dask_gateway_server = DaskGateway(config=local_dask_gateway_server_config) - dask_gateway_server.initialize([]) # that is a shitty one! - print("--> local dask gateway server initialized") - await dask_gateway_server.setup() - await dask_gateway_server.backend.proxy._proxy_contacted # pylint: disable=protected-access - - print("--> local dask gateway server setup completed") - yield DaskGatewayServer( - f"http://{dask_gateway_server.backend.proxy.address}", - f"gateway://{dask_gateway_server.backend.proxy.tcp_address}", - local_dask_gateway_server_config.SimpleAuthenticator.password, # type: ignore - dask_gateway_server, - ) - print("--> local dask gateway server switching off...") - await dask_gateway_server.cleanup() - print("...done") - - @pytest.fixture(params=list(FileLinkType)) def tasks_file_link_type(request) -> FileLinkType: """parametrized fixture on all FileLinkType enum variants""" @@ -421,13 +358,17 @@ def mocked_catalog_service_api( @pytest.fixture() -def caplog_info_level(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]: +def caplog_info_level( + caplog: pytest.LogCaptureFixture, +) -> Iterable[pytest.LogCaptureFixture]: with caplog.at_level(logging.INFO): yield caplog @pytest.fixture() -def caplog_debug_level(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]: +def caplog_debug_level( + caplog: pytest.LogCaptureFixture, +) -> Iterable[pytest.LogCaptureFixture]: with caplog.at_level(logging.DEBUG): yield caplog diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py b/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py index 70c750ab541..10288762265 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py @@ -3,7 +3,7 @@ # pylint:disable=redefined-outer-name import json -from collections.abc import AsyncIterator, Callable +from collections.abc import Callable from typing import Any import httpx @@ -13,7 +13,6 @@ from dask_gateway import Gateway, GatewayCluster, auth from distributed import Client as DaskClient from distributed.deploy.spec import SpecCluster -from faker import Faker from models_library.api_schemas_directorv2.clusters import ClusterDetailsGet from models_library.clusters import Cluster, ClusterID, SimpleAuthentication from models_library.users import UserID @@ -49,54 +48,6 @@ def clusters_config( monkeypatch.setenv("S3_SECURE", "false") -@pytest.fixture -async def dask_gateway( - local_dask_gateway_server: DaskGatewayServer, -) -> Gateway: - async with Gateway( - local_dask_gateway_server.address, - local_dask_gateway_server.proxy_address, - asynchronous=True, - auth=auth.BasicAuth("pytest_user", local_dask_gateway_server.password), - ) as gateway: - print(f"--> {gateway=} created") - cluster_options = await gateway.cluster_options() - gateway_versions = await gateway.get_versions() - clusters_list = await gateway.list_clusters() - print(f"--> {gateway_versions=}, {cluster_options=}, {clusters_list=}") - for option in cluster_options.items(): - print(f"--> {option=}") - return gateway - - -@pytest.fixture -async def dask_gateway_cluster(dask_gateway: Gateway) -> AsyncIterator[GatewayCluster]: - async with dask_gateway.new_cluster() as cluster: - yield cluster - - -@pytest.fixture -async def dask_gateway_cluster_client( - dask_gateway_cluster: GatewayCluster, -) -> AsyncIterator[DaskClient]: - async with dask_gateway_cluster.get_client() as client: - yield client - - -@pytest.fixture -def cluster_simple_authentication(faker: Faker) -> Callable[[], dict[str, Any]]: - def creator() -> dict[str, Any]: - simple_auth = { - "type": "simple", - "username": faker.user_name(), - "password": faker.password(), - } - assert SimpleAuthentication.parse_obj(simple_auth) - return simple_auth - - return creator - - @pytest.mark.skip( reason="test for helping developers understand how to use dask gateways" ) diff --git a/services/osparc-gateway-server/Makefile b/services/osparc-gateway-server/Makefile index 92d7d7712b8..93397bbba18 100644 --- a/services/osparc-gateway-server/Makefile +++ b/services/osparc-gateway-server/Makefile @@ -135,7 +135,7 @@ up up-version: .stack-$(SWARM_STACK_NAME)-version.yml .init-swarm config ## Depl @$(_show_endpoints) up-latest: - @export DOCKER_IMAGE_TAG=latest && \ + @export DOCKER_IMAGE_TAG=release-github-latest && \ $(MAKE) up-version .PHONY: down