diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e23eaed1d61..bf135727ee2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -45,3 +45,10 @@ repos: hooks: - id: black name: black format code + - repo: local + hooks: + - id: pytest-testit + name: pytest-testit + language: script + types: [file, python] + entry: scripts/precommit/pytest-testit.bash diff --git a/scripts/precommit/pytest-testit.bash b/scripts/precommit/pytest-testit.bash new file mode 100755 index 00000000000..7ffd95e5e88 --- /dev/null +++ b/scripts/precommit/pytest-testit.bash @@ -0,0 +1,10 @@ +#!/bin/bash + +# Check for the presence of @pytest.mark.testit in staged files +git diff --cached --name-only | while IFS= read -r file; do + if grep -n '@pytest\.mark\.testit' "$file"; then + sed -i '/@pytest\.mark\.testit/d' "$file" + echo "Removed @pytest.mark.testit from file '$file'" + exit 1 + fi +done diff --git a/services/autoscaling/requirements/_base.in b/services/autoscaling/requirements/_base.in index 2d19068def7..09555b35987 100644 --- a/services/autoscaling/requirements/_base.in +++ b/services/autoscaling/requirements/_base.in @@ -13,7 +13,7 @@ --requirement ../../../packages/service-library/requirements/_base.in --requirement ../../../packages/service-library/requirements/_fastapi.in - +aiocache aiodocker aioboto3 dask[distributed] diff --git a/services/autoscaling/requirements/_base.txt b/services/autoscaling/requirements/_base.txt index bd4f35bad55..0c81d2664c3 100644 --- a/services/autoscaling/requirements/_base.txt +++ b/services/autoscaling/requirements/_base.txt @@ -14,6 +14,8 @@ aiobotocore==2.7.0 # via # aioboto3 # aiobotocore +aiocache==0.12.2 + # via -r requirements/_base.in aiodebug==2.3.0 # via # -c requirements/../../../packages/service-library/requirements/./_base.in diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index a231fdc5446..8be0f953fe8 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -44,7 +44,7 @@ def __sub__(self, other: "Resources") -> "Resources": @dataclass(frozen=True) class EC2InstanceType: - name: str + name: InstanceTypeType cpus: PositiveInt ram: ByteSize @@ -59,6 +59,7 @@ class EC2InstanceData: aws_private_dns: InstancePrivateDNSName type: InstanceTypeType state: InstanceStateNameType + resources: Resources @dataclass(frozen=True) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 81b177a42df..5dcd7de627c 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -13,7 +13,6 @@ Node, NodeState, ) -from pydantic import parse_obj_as from servicelib.logging_utils import log_catch from types_aiobotocore_ec2.literals import InstanceTypeType @@ -166,7 +165,9 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: ec2_client = get_ec2_client(app) # some instances might be able to run several tasks - allowed_instance_types = await ec2_client.get_ec2_instance_capabilities( + allowed_instance_types: list[ + EC2InstanceType + ] = await ec2_client.get_ec2_instance_capabilities( cast( # type: ignore set[InstanceTypeType], set( @@ -178,7 +179,7 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]: def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int: assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec return app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.index( - instance_type.name + f"{instance_type.name}" ) allowed_instance_types.sort(key=_sort_according_to_allowed_types) @@ -282,6 +283,9 @@ async def _find_needed_instances( pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [ (i, []) for i in cluster.pending_ec2s ] + drained_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [ + (i.ec2_instance, []) for i in cluster.drained_nodes + ] needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = [] for task in pending_tasks: task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance( @@ -290,15 +294,22 @@ async def _find_needed_instances( ( filtered_active_instance_to_task, filtered_pending_instance_to_task, + filtered_drained_instances_to_task, filtered_needed_new_instance_types_to_task, ) = filter_by_task_defined_instance( task_defined_ec2_type, active_instances_to_tasks, pending_instances_to_tasks, + drained_instances_to_tasks, needed_new_instance_types_for_tasks, ) # try to assign the task to one of the active, pending or net created instances + _logger.debug( + "Try to assign %s to any active/pending/created instance in the %s", + f"{task}", + f"{cluster=}", + ) if ( await auto_scaling_mode.try_assigning_task_to_instances( app, @@ -314,6 +325,13 @@ async def _find_needed_instances( type_to_instance_map, notify_progress=True, ) + or await auto_scaling_mode.try_assigning_task_to_instances( + app, + task, + filtered_drained_instances_to_task, + type_to_instance_map, + notify_progress=False, + ) or auto_scaling_mode.try_assigning_task_to_instance_types( task, filtered_needed_new_instance_types_to_task ) @@ -390,12 +408,12 @@ async def _start_instances( *[ ec2_client.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - instance_type=parse_obj_as(InstanceTypeType, instance.name), + instance_type=instance_type, tags=instance_tags, startup_script=instance_startup_script, number_of_instances=instance_num, ) - for instance, instance_num in needed_instances.items() + for instance_type, instance_num in needed_instances.items() ], return_exceptions=True, ) @@ -472,8 +490,8 @@ async def _deactivate_empty_nodes( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling ) -> Cluster: docker_client = get_docker_client(app) - active_empty_nodes: list[AssociatedInstance] = [] - active_non_empty_nodes: list[AssociatedInstance] = [] + active_empty_instances: list[AssociatedInstance] = [] + active_non_empty_instances: list[AssociatedInstance] = [] for instance in cluster.active_nodes: try: node_used_resources = await auto_scaling_mode.compute_node_used_resources( @@ -481,34 +499,38 @@ async def _deactivate_empty_nodes( instance, ) if node_used_resources == Resources.create_as_empty(): - active_empty_nodes.append(instance) + active_empty_instances.append(instance) else: - active_non_empty_nodes.append(instance) + active_non_empty_instances.append(instance) except DaskWorkerNotFoundError: # noqa: PERF203 _logger.exception( "EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation" ) # drain this empty nodes - await asyncio.gather( + updated_nodes: list[Node] = await asyncio.gather( *( utils_docker.set_node_availability( docker_client, node.node, available=False, ) - for node in active_empty_nodes + for node in active_empty_instances ) ) - if active_empty_nodes: + if updated_nodes: _logger.info( "following nodes set to drain: '%s'", - f"{[node.node.Description.Hostname for node in active_empty_nodes if node.node.Description]}", + f"{[node.Description.Hostname for node in updated_nodes if node.Description]}", ) + newly_drained_instances = [ + AssociatedInstance(node, instance.ec2_instance) + for instance, node in zip(active_empty_instances, updated_nodes, strict=True) + ] return dataclasses.replace( cluster, - active_nodes=active_non_empty_nodes, - drained_nodes=cluster.drained_nodes + active_empty_nodes, + active_nodes=active_non_empty_instances, + drained_nodes=cluster.drained_nodes + newly_drained_instances, ) @@ -537,6 +559,12 @@ async def _find_terminateable_instances( ): # let's terminate that one terminateable_nodes.append(instance) + else: + _logger.info( + "%s has still %ss before being terminateable", + f"{instance.ec2_instance.id=}", + f"{(elapsed_time_since_drained - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION).total_seconds()}", + ) if terminateable_nodes: _logger.info( diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py index 159d9ccd9ab..0fe64471056 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_mode_computational.py @@ -79,7 +79,6 @@ async def try_assigning_task_to_instances( app, pending_task, instances_to_tasks, - type_to_instance_map, notify_progress=notify_progress, ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py index f55948e9d81..f8c0eb595e1 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/ec2.py @@ -6,6 +6,7 @@ import aioboto3 import botocore.exceptions from aiobotocore.session import ClientCreatorContext +from aiocache import cached from fastapi import FastAPI from pydantic import ByteSize, parse_obj_as from servicelib.logging_utils import log_context @@ -24,7 +25,7 @@ Ec2TooManyInstancesError, ) from ..core.settings import EC2InstancesSettings, EC2Settings -from ..models import EC2InstanceData, EC2InstanceType +from ..models import EC2InstanceData, EC2InstanceType, Resources from ..utils.utils_ec2 import compose_user_data logger = logging.getLogger(__name__) @@ -63,6 +64,7 @@ async def ping(self) -> bool: except Exception: # pylint: disable=broad-except return False + @cached(noself=True) async def get_ec2_instance_capabilities( self, instance_type_names: set[InstanceTypeType], @@ -88,7 +90,7 @@ async def get_ec2_instance_capabilities( async def start_aws_instance( self, instance_settings: EC2InstancesSettings, - instance_type: InstanceTypeType, + instance_type: EC2InstanceType, tags: dict[str, str], startup_script: str, number_of_instances: int, @@ -96,7 +98,7 @@ async def start_aws_instance( with log_context( logger, logging.INFO, - msg=f"launching {number_of_instances} AWS instance(s) {instance_type} with {tags=}", + msg=f"launching {number_of_instances} AWS instance(s) {instance_type.name} with {tags=}", ): # first check the max amount is not already reached current_instances = await self.get_instances(instance_settings, tags) @@ -112,7 +114,7 @@ async def start_aws_instance( ImageId=instance_settings.EC2_INSTANCES_AMI_ID, MinCount=number_of_instances, MaxCount=number_of_instances, - InstanceType=instance_type, + InstanceType=instance_type.name, InstanceInitiatedShutdownBehavior="terminate", KeyName=instance_settings.EC2_INSTANCES_KEY_NAME, SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID, @@ -149,6 +151,7 @@ async def start_aws_instance( aws_private_dns=instance["PrivateDnsName"], type=instance["InstanceType"], state=instance["State"]["Name"], + resources=Resources(cpus=instance_type.cpus, ram=instance_type.ram), ) for instance in instances["Reservations"][0]["Instances"] ] @@ -192,6 +195,10 @@ async def get_instances( assert "InstanceType" in instance # nosec assert "State" in instance # nosec assert "Name" in instance["State"] # nosec + ec2_instance_types = await self.get_ec2_instance_capabilities( + {instance["InstanceType"]} + ) + assert len(ec2_instance_types) == 1 # nosec all_instances.append( EC2InstanceData( launch_time=instance["LaunchTime"], @@ -199,6 +206,10 @@ async def get_instances( aws_private_dns=instance["PrivateDnsName"], type=instance["InstanceType"], state=instance["State"]["Name"], + resources=Resources( + cpus=ec2_instance_types[0].cpus, + ram=ec2_instance_types[0].ram, + ), ) ) logger.debug("received: %s", f"{all_instances=}") diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py index 904501e7d8b..7d9ab4f310d 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/auto_scaling_core.py @@ -57,7 +57,7 @@ def _find_node_with_name(node: Node) -> bool: try: docker_node_name = node_host_name_from_ec2_private_dns(instance_data) except Ec2InvalidDnsNameError: - _logger.exception("Unexcepted EC2 private dns name") + _logger.exception("Unexpected EC2 private dns name") non_associated_instances.append(instance_data) continue @@ -125,6 +125,7 @@ def filter_by_task_defined_instance( instance_type_name: InstanceTypeType | None, active_instances_to_tasks, pending_instances_to_tasks, + drained_instances_to_tasks, needed_new_instance_types_for_tasks, ) -> tuple: return ( @@ -140,6 +141,12 @@ def filter_by_task_defined_instance( ), pending_instances_to_tasks, ), + filter( + functools.partial( + _instance_data_map_by_type_name, type_name=instance_type_name + ), + drained_instances_to_tasks, + ), filter( functools.partial( _instance_type_map_by_type_name, type_name=instance_type_name diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py index cb9a164aeaf..f5cb43c4acb 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/computational_scaling.py @@ -1,6 +1,7 @@ import datetime import logging -from typing import Final, Iterable +from collections.abc import Iterable +from typing import Final from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY from fastapi import FastAPI @@ -15,7 +16,6 @@ EC2InstanceType, Resources, ) -from . import utils_docker _logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ def try_assigning_task_to_node( instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]], ) -> bool: for instance, node_assigned_tasks in instance_to_tasks: - instance_total_resource = utils_docker.get_node_total_resources(instance.node) + instance_total_resource = instance.ec2_instance.resources tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks) if ( instance_total_resource - tasks_needed_resources @@ -60,7 +60,6 @@ async def try_assigning_task_to_instances( app: FastAPI, pending_task: DaskTask, instances_to_tasks: Iterable[tuple[EC2InstanceData, list[DaskTask]]], - type_to_instance_map: dict[str, EC2InstanceType], *, notify_progress: bool, ) -> bool: @@ -70,10 +69,7 @@ async def try_assigning_task_to_instances( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) for instance, instance_assigned_tasks in instances_to_tasks: - instance_type = type_to_instance_map[instance.type] - instance_total_resources = Resources( - cpus=instance_type.cpus, ram=instance_type.ram - ) + instance_total_resources = instance.resources tasks_needed_resources = _compute_tasks_needed_resources( instance_assigned_tasks ) diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index 7688ac3b21b..619ec908545 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -8,9 +8,11 @@ import json import random from collections.abc import AsyncIterator, Awaitable, Callable, Iterator +from copy import deepcopy from datetime import timezone from pathlib import Path from typing import Any, Final, cast +from unittest import mock import aiodocker import distributed @@ -43,7 +45,7 @@ from settings_library.rabbit import RabbitSettings from simcore_service_autoscaling.core.application import create_app from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings -from simcore_service_autoscaling.models import Cluster, DaskTaskResources +from simcore_service_autoscaling.models import Cluster, DaskTaskResources, Resources from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.modules.ec2 import AutoscalingEC2, EC2InstanceData from tenacity import retry @@ -511,6 +513,7 @@ def aws_allowed_ec2_instance_type_names() -> list[InstanceTypeType]: "t2.xlarge", "t2.2xlarge", "g3.4xlarge", + "g4dn.2xlarge", "r5n.4xlarge", "r5n.8xlarge", ] @@ -686,6 +689,7 @@ def _creator(**overrides) -> EC2InstanceData: "aws_private_dns": f"ip-{faker.ipv4().replace('.', '-')}.ec2.internal", "type": faker.pystr(), "state": faker.pystr(), + "resources": Resources(cpus=4.0, ram=ByteSize(1024 * 1024)), } | overrides ) @@ -743,3 +747,25 @@ def _creator(required_resources: DaskTaskResources) -> distributed.Future: return future return _creator + + +@pytest.fixture +def mock_set_node_availability(mocker: MockerFixture) -> mock.Mock: + async def _fake_set_node_availability( + docker_client: AutoscalingDocker, node: Node, *, available: bool + ) -> Node: + returned_node = deepcopy(node) + assert returned_node.Spec + returned_node.Spec.Availability = ( + Availability.active if available else Availability.drain + ) + returned_node.UpdatedAt = datetime.datetime.now( + tz=datetime.timezone.utc + ).isoformat() + return returned_node + + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_availability", + autospec=True, + side_effect=_fake_set_node_availability, + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index b187de71aee..0482437bb5d 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -192,14 +192,6 @@ def mock_find_node_with_name( ) -@pytest.fixture -def mock_set_node_availability(mocker: MockerFixture) -> mock.Mock: - return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_availability", - autospec=True, - ) - - @pytest.fixture def mock_cluster_used_resources(mocker: MockerFixture) -> mock.Mock: return mocker.patch( @@ -323,16 +315,22 @@ def _do( id="No explicit instance defined", ), pytest.param( - "t2.xlarge", - parse_obj_as(ByteSize, "4Gib"), - "t2.xlarge", - id="Explicitely ask for t2.xlarge", + "g4dn.2xlarge", + None, + "g4dn.2xlarge", + id="Explicitely ask for g4dn.2xlarge and use all the resources", ), pytest.param( "r5n.8xlarge", - parse_obj_as(ByteSize, "128Gib"), + parse_obj_as(ByteSize, "116Gib"), "r5n.8xlarge", - id="Explicitely ask for r5n.8xlarge", + id="Explicitely ask for r5n.8xlarge and set the resources", + ), + pytest.param( + "r5n.8xlarge", + None, + "r5n.8xlarge", + id="Explicitely ask for r5n.8xlarge and use all the resources", ), ], ) @@ -352,7 +350,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 dask_spec_local_cluster: distributed.SpecCluster, create_dask_task_resources: Callable[..., DaskTaskResources], dask_task_imposed_ec2_type: InstanceTypeType | None, - dask_ram: ByteSize, + dask_ram: ByteSize | None, expected_ec2_type: InstanceTypeType, ): # we have nothing running now @@ -360,6 +358,19 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert not all_instances["Reservations"] # create a task that needs more power + if dask_task_imposed_ec2_type and not dask_ram: + instance_types = await ec2_client.describe_instance_types( + InstanceTypes=[dask_task_imposed_ec2_type] + ) + assert instance_types + assert "InstanceTypes" in instance_types + assert instance_types["InstanceTypes"] + assert "MemoryInfo" in instance_types["InstanceTypes"][0] + assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] + dask_ram = parse_obj_as( + ByteSize, + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ) dask_task_resources = create_dask_task_resources( dask_task_imposed_ec2_type, dask_ram ) @@ -493,6 +504,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 return_value=Resources.create_as_empty(), autospec=True, ) + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mocked_dask_get_worker_still_has_results_in_memory.assert_called() assert mocked_dask_get_worker_still_has_results_in_memory.call_count == 2 @@ -738,7 +750,7 @@ async def test__deactivate_empty_nodes( initialized_app, active_cluster, ComputationalAutoscaling() ) assert not updated_cluster.active_nodes - assert updated_cluster.drained_nodes == active_cluster.active_nodes + assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_set_node_availability.assert_called_once_with( mock.ANY, host_node, available=False ) @@ -773,7 +785,7 @@ async def test__deactivate_empty_nodes_with_finished_tasks_should_not_deactivate initialized_app, deepcopy(active_cluster), ComputationalAutoscaling() ) assert not updated_cluster.active_nodes - assert updated_cluster.drained_nodes == active_cluster.active_nodes + assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_set_node_availability.assert_called_once_with( mock.ANY, host_node, available=False ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 9b1cfc77824..0044c157b0d 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -106,14 +106,6 @@ async def fake_tag_node(*args, **kwargs) -> Node: ) -@pytest.fixture -def mock_set_node_availability(mocker: MockerFixture) -> mock.Mock: - return mocker.patch( - "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_availability", - autospec=True, - ) - - @pytest.fixture def mock_remove_nodes(mocker: MockerFixture) -> mock.Mock: return mocker.patch( @@ -424,7 +416,6 @@ async def _assert_ec2_instances( return internal_dns_names -@pytest.mark.testit @pytest.mark.acceptance_test() @pytest.mark.parametrize( "docker_service_imposed_ec2_type, docker_service_ram, expected_ec2_type", @@ -799,7 +790,7 @@ async def test__deactivate_empty_nodes( initialized_app, active_cluster, DynamicAutoscaling() ) assert not updated_cluster.active_nodes - assert updated_cluster.drained_nodes == active_cluster.active_nodes + assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_set_node_availability.assert_called_once_with( mock.ANY, host_node, available=False ) @@ -835,7 +826,7 @@ async def test__deactivate_empty_nodes_to_drain_when_services_running_are_missin initialized_app, active_cluster, DynamicAutoscaling() ) assert not updated_cluster.active_nodes - assert updated_cluster.drained_nodes == active_cluster.active_nodes + assert len(updated_cluster.drained_nodes) == len(active_cluster.active_nodes) mock_set_node_availability.assert_called_once_with( mock.ANY, host_node, available=False ) diff --git a/services/autoscaling/tests/unit/test_modules_ec2.py b/services/autoscaling/tests/unit/test_modules_ec2.py index e186b14b4ac..b64b8517f5c 100644 --- a/services/autoscaling/tests/unit/test_modules_ec2.py +++ b/services/autoscaling/tests/unit/test_modules_ec2.py @@ -2,14 +2,15 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -from typing import Callable, cast +from collections.abc import Callable +from typing import cast import botocore.exceptions import pytest from faker import Faker from fastapi import FastAPI from moto.server import ThreadedMotoServer -from pytest_mock.plugin import MockerFixture +from pydantic import ByteSize, parse_obj_as from pytest_simcore.helpers.utils_envs import EnvVarsDict from simcore_service_autoscaling.core.errors import ( ConfigurationError, @@ -17,6 +18,7 @@ Ec2TooManyInstancesError, ) from simcore_service_autoscaling.core.settings import ApplicationSettings, EC2Settings +from simcore_service_autoscaling.models import EC2InstanceType from simcore_service_autoscaling.modules.ec2 import ( AutoscalingEC2, EC2InstanceData, @@ -137,6 +139,33 @@ async def test_get_ec2_instance_capabilities( assert any(i.name == instance_type_name for i in instance_types) +@pytest.fixture +async def fake_ec2_instance_type( + mocked_aws_server_envs: None, + ec2_client: EC2Client, +) -> EC2InstanceType: + instance_type_name: InstanceTypeType = parse_obj_as(InstanceTypeType, "c3.8xlarge") + instance_types = await ec2_client.describe_instance_types( + InstanceTypes=[instance_type_name] + ) + assert instance_types + assert "InstanceTypes" in instance_types + assert instance_types["InstanceTypes"] + assert "MemoryInfo" in instance_types["InstanceTypes"][0] + assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] + assert "VCpuInfo" in instance_types["InstanceTypes"][0] + assert "DefaultVCpus" in instance_types["InstanceTypes"][0]["VCpuInfo"] + + return EC2InstanceType( + name=instance_type_name, + cpus=instance_types["InstanceTypes"][0]["VCpuInfo"]["DefaultVCpus"], + ram=parse_obj_as( + ByteSize, + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ), + ) + + async def test_start_aws_instance( mocked_aws_server_envs: None, aws_vpc_id: str, @@ -147,7 +176,7 @@ async def test_start_aws_instance( autoscaling_ec2: AutoscalingEC2, app_settings: ApplicationSettings, faker: Faker, - mocker: MockerFixture, + fake_ec2_instance_type: EC2InstanceType, ): assert app_settings.AUTOSCALING_EC2_ACCESS assert app_settings.AUTOSCALING_EC2_INSTANCES @@ -155,12 +184,11 @@ async def test_start_aws_instance( all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] - instance_type = faker.pystr() tags = faker.pydict(allowed_types=(str,)) startup_script = faker.pystr() await autoscaling_ec2.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - instance_type, + fake_ec2_instance_type, tags=tags, startup_script=startup_script, number_of_instances=1, @@ -174,7 +202,7 @@ async def test_start_aws_instance( assert len(running_instance["Instances"]) == 1 running_instance = running_instance["Instances"][0] assert "InstanceType" in running_instance - assert running_instance["InstanceType"] == instance_type + assert running_instance["InstanceType"] == fake_ec2_instance_type.name assert "Tags" in running_instance assert running_instance["Tags"] == [ {"Key": key, "Value": value} for key, value in tags.items() @@ -191,7 +219,7 @@ async def test_start_aws_instance_is_limited_in_number_of_instances( autoscaling_ec2: AutoscalingEC2, app_settings: ApplicationSettings, faker: Faker, - mocker: MockerFixture, + fake_ec2_instance_type: EC2InstanceType, ): assert app_settings.AUTOSCALING_EC2_ACCESS assert app_settings.AUTOSCALING_EC2_INSTANCES @@ -205,7 +233,7 @@ async def test_start_aws_instance_is_limited_in_number_of_instances( for _ in range(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES): await autoscaling_ec2.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - faker.pystr(), + fake_ec2_instance_type, tags=tags, startup_script=startup_script, number_of_instances=1, @@ -215,7 +243,7 @@ async def test_start_aws_instance_is_limited_in_number_of_instances( with pytest.raises(Ec2TooManyInstancesError): await autoscaling_ec2.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - faker.pystr(), + fake_ec2_instance_type, tags=tags, startup_script=startup_script, number_of_instances=1, @@ -232,7 +260,7 @@ async def test_get_instances( autoscaling_ec2: AutoscalingEC2, app_settings: ApplicationSettings, faker: Faker, - mocker: MockerFixture, + fake_ec2_instance_type: EC2InstanceType, ): assert app_settings.AUTOSCALING_EC2_INSTANCES # we have nothing running now in ec2 @@ -244,12 +272,11 @@ async def test_get_instances( ) # create some instance - instance_type = faker.pystr() tags = faker.pydict(allowed_types=(str,)) startup_script = faker.pystr() created_instances = await autoscaling_ec2.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - instance_type, + fake_ec2_instance_type, tags=tags, startup_script=startup_script, number_of_instances=1, @@ -273,19 +300,18 @@ async def test_terminate_instance( autoscaling_ec2: AutoscalingEC2, app_settings: ApplicationSettings, faker: Faker, - mocker: MockerFixture, + fake_ec2_instance_type: EC2InstanceType, ): assert app_settings.AUTOSCALING_EC2_INSTANCES # we have nothing running now in ec2 all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] # create some instance - instance_type = faker.pystr() tags = faker.pydict(allowed_types=(str,)) startup_script = faker.pystr() created_instances = await autoscaling_ec2.start_aws_instance( app_settings.AUTOSCALING_EC2_INSTANCES, - instance_type, + fake_ec2_instance_type, tags=tags, startup_script=startup_script, number_of_instances=1, diff --git a/services/autoscaling/tests/unit/test_utils_computational_scaling.py b/services/autoscaling/tests/unit/test_utils_computational_scaling.py index 4cd3ae46208..17c3e5f5d9d 100644 --- a/services/autoscaling/tests/unit/test_utils_computational_scaling.py +++ b/services/autoscaling/tests/unit/test_utils_computational_scaling.py @@ -140,9 +140,7 @@ async def test_try_assigning_task_to_pending_instances_with_no_instances( ): task = fake_task() assert ( - await try_assigning_task_to_instances( - fake_app, task, [], {}, notify_progress=True - ) + await try_assigning_task_to_instances(fake_app, task, [], notify_progress=True) is False ) @@ -157,18 +155,13 @@ async def test_try_assigning_task_to_pending_instances( pending_instance_to_tasks: list[tuple[EC2InstanceData, list[DaskTask]]] = [ (ec2_instance, []) ] - type_to_instance_map = { - ec2_instance.type: EC2InstanceType( - name=ec2_instance.type, cpus=4, ram=ByteSize(1024 * 1024) - ) - } + # calling once should allow to add that task to the instance assert ( await try_assigning_task_to_instances( fake_app, task, pending_instance_to_tasks, - type_to_instance_map, notify_progress=True, ) is True @@ -180,7 +173,6 @@ async def test_try_assigning_task_to_pending_instances( fake_app, task, pending_instance_to_tasks, - type_to_instance_map, notify_progress=True, ) is True @@ -192,7 +184,6 @@ async def test_try_assigning_task_to_pending_instances( fake_app, task, pending_instance_to_tasks, - type_to_instance_map, notify_progress=True, ) is False