Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Prevent autoscaling from creating workers indefinitely #5008

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,10 @@ repos:
hooks:
- id: black
name: black format code
- repo: local
hooks:
- id: pytest-testit
name: pytest-testit
language: script
types: [file, python]
entry: scripts/precommit/pytest-testit.bash
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 10 additions & 0 deletions scripts/precommit/pytest-testit.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

# Check for the presence of @pytest.mark.testit in staged files
git diff --cached --name-only | while IFS= read -r file; do
if grep -n '@pytest\.mark\.testit' "$file"; then
sed -i '/@pytest\.mark\.testit/d' "$file"
echo "Removed @pytest.mark.testit from file '$file'"
exit 1
fi
done
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion services/autoscaling/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
--requirement ../../../packages/service-library/requirements/_base.in
--requirement ../../../packages/service-library/requirements/_fastapi.in


aiocache
aiodocker
aioboto3
dask[distributed]
Expand Down
2 changes: 2 additions & 0 deletions services/autoscaling/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ aiobotocore==2.7.0
# via
# aioboto3
# aiobotocore
aiocache==0.12.2
# via -r requirements/_base.in
aiodebug==2.3.0
# via
# -c requirements/../../../packages/service-library/requirements/./_base.in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __sub__(self, other: "Resources") -> "Resources":

@dataclass(frozen=True)
class EC2InstanceType:
name: str
name: InstanceTypeType
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
cpus: PositiveInt
ram: ByteSize

Expand All @@ -59,6 +59,7 @@ class EC2InstanceData:
aws_private_dns: InstancePrivateDNSName
type: InstanceTypeType
state: InstanceStateNameType
resources: Resources


@dataclass(frozen=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
Node,
NodeState,
)
from pydantic import parse_obj_as
from servicelib.logging_utils import log_catch
from types_aiobotocore_ec2.literals import InstanceTypeType

Expand Down Expand Up @@ -166,7 +165,9 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
ec2_client = get_ec2_client(app)

# some instances might be able to run several tasks
allowed_instance_types = await ec2_client.get_ec2_instance_capabilities(
allowed_instance_types: list[
EC2InstanceType
] = await ec2_client.get_ec2_instance_capabilities(
cast( # type: ignore
set[InstanceTypeType],
set(
Expand All @@ -178,7 +179,7 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
def _sort_according_to_allowed_types(instance_type: EC2InstanceType) -> int:
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
return app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.index(
instance_type.name
f"{instance_type.name}"
)

allowed_instance_types.sort(key=_sort_according_to_allowed_types)
Expand Down Expand Up @@ -282,6 +283,9 @@ async def _find_needed_instances(
pending_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i, []) for i in cluster.pending_ec2s
]
drained_instances_to_tasks: list[tuple[EC2InstanceData, list]] = [
(i.ec2_instance, []) for i in cluster.drained_nodes
]
needed_new_instance_types_for_tasks: list[tuple[EC2InstanceType, list]] = []
for task in pending_tasks:
task_defined_ec2_type = await auto_scaling_mode.get_task_defined_instance(
Expand All @@ -290,15 +294,22 @@ async def _find_needed_instances(
(
filtered_active_instance_to_task,
filtered_pending_instance_to_task,
filtered_drained_instances_to_task,
filtered_needed_new_instance_types_to_task,
) = filter_by_task_defined_instance(
task_defined_ec2_type,
active_instances_to_tasks,
pending_instances_to_tasks,
drained_instances_to_tasks,
needed_new_instance_types_for_tasks,
)

# try to assign the task to one of the active, pending or net created instances
_logger.debug(
"Try to assign %s to any active/pending/created instance in the %s",
f"{task}",
f"{cluster=}",
)
if (
await auto_scaling_mode.try_assigning_task_to_instances(
app,
Expand All @@ -314,6 +325,13 @@ async def _find_needed_instances(
type_to_instance_map,
notify_progress=True,
)
or await auto_scaling_mode.try_assigning_task_to_instances(
app,
task,
filtered_drained_instances_to_task,
type_to_instance_map,
notify_progress=False,
)
or auto_scaling_mode.try_assigning_task_to_instance_types(
task, filtered_needed_new_instance_types_to_task
)
Expand Down Expand Up @@ -390,12 +408,12 @@ async def _start_instances(
*[
ec2_client.start_aws_instance(
app_settings.AUTOSCALING_EC2_INSTANCES,
instance_type=parse_obj_as(InstanceTypeType, instance.name),
instance_type=instance_type,
tags=instance_tags,
startup_script=instance_startup_script,
number_of_instances=instance_num,
)
for instance, instance_num in needed_instances.items()
for instance_type, instance_num in needed_instances.items()
],
return_exceptions=True,
)
Expand Down Expand Up @@ -472,43 +490,47 @@ async def _deactivate_empty_nodes(
app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling
) -> Cluster:
docker_client = get_docker_client(app)
active_empty_nodes: list[AssociatedInstance] = []
active_non_empty_nodes: list[AssociatedInstance] = []
active_empty_instances: list[AssociatedInstance] = []
active_non_empty_instances: list[AssociatedInstance] = []
for instance in cluster.active_nodes:
try:
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
app,
instance,
)
if node_used_resources == Resources.create_as_empty():
active_empty_nodes.append(instance)
active_empty_instances.append(instance)
else:
active_non_empty_nodes.append(instance)
active_non_empty_instances.append(instance)
except DaskWorkerNotFoundError: # noqa: PERF203
_logger.exception(
"EC2 node instance is not registered to dask-scheduler! TIP: Needs investigation"
)

# drain this empty nodes
await asyncio.gather(
updated_nodes: list[Node] = await asyncio.gather(
*(
utils_docker.set_node_availability(
docker_client,
node.node,
available=False,
)
for node in active_empty_nodes
for node in active_empty_instances
)
)
if active_empty_nodes:
if updated_nodes:
_logger.info(
"following nodes set to drain: '%s'",
f"{[node.node.Description.Hostname for node in active_empty_nodes if node.node.Description]}",
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
)
newly_drained_instances = [
AssociatedInstance(node, instance.ec2_instance)
for instance, node in zip(active_empty_instances, updated_nodes, strict=True)
]
return dataclasses.replace(
cluster,
active_nodes=active_non_empty_nodes,
drained_nodes=cluster.drained_nodes + active_empty_nodes,
active_nodes=active_non_empty_instances,
drained_nodes=cluster.drained_nodes + newly_drained_instances,
)


Expand Down Expand Up @@ -537,6 +559,12 @@ async def _find_terminateable_instances(
):
# let's terminate that one
terminateable_nodes.append(instance)
else:
_logger.info(
"%s has still %ss before being terminateable",
f"{instance.ec2_instance.id=}",
f"{(elapsed_time_since_drained - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION).total_seconds()}",
)

if terminateable_nodes:
_logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ async def try_assigning_task_to_instances(
app,
pending_task,
instances_to_tasks,
type_to_instance_map,
notify_progress=notify_progress,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import aioboto3
import botocore.exceptions
from aiobotocore.session import ClientCreatorContext
from aiocache import cached
from fastapi import FastAPI
from pydantic import ByteSize, parse_obj_as
from servicelib.logging_utils import log_context
Expand All @@ -24,7 +25,7 @@
Ec2TooManyInstancesError,
)
from ..core.settings import EC2InstancesSettings, EC2Settings
from ..models import EC2InstanceData, EC2InstanceType
from ..models import EC2InstanceData, EC2InstanceType, Resources
from ..utils.utils_ec2 import compose_user_data

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -63,6 +64,7 @@ async def ping(self) -> bool:
except Exception: # pylint: disable=broad-except
return False

@cached(noself=True)
async def get_ec2_instance_capabilities(
self,
instance_type_names: set[InstanceTypeType],
Expand All @@ -88,15 +90,15 @@ async def get_ec2_instance_capabilities(
async def start_aws_instance(
self,
instance_settings: EC2InstancesSettings,
instance_type: InstanceTypeType,
instance_type: EC2InstanceType,
tags: dict[str, str],
startup_script: str,
number_of_instances: int,
) -> list[EC2InstanceData]:
with log_context(
logger,
logging.INFO,
msg=f"launching {number_of_instances} AWS instance(s) {instance_type} with {tags=}",
msg=f"launching {number_of_instances} AWS instance(s) {instance_type.name} with {tags=}",
):
# first check the max amount is not already reached
current_instances = await self.get_instances(instance_settings, tags)
Expand All @@ -112,7 +114,7 @@ async def start_aws_instance(
ImageId=instance_settings.EC2_INSTANCES_AMI_ID,
MinCount=number_of_instances,
MaxCount=number_of_instances,
InstanceType=instance_type,
InstanceType=instance_type.name,
InstanceInitiatedShutdownBehavior="terminate",
KeyName=instance_settings.EC2_INSTANCES_KEY_NAME,
SubnetId=instance_settings.EC2_INSTANCES_SUBNET_ID,
Expand Down Expand Up @@ -149,6 +151,7 @@ async def start_aws_instance(
aws_private_dns=instance["PrivateDnsName"],
type=instance["InstanceType"],
state=instance["State"]["Name"],
resources=Resources(cpus=instance_type.cpus, ram=instance_type.ram),
)
for instance in instances["Reservations"][0]["Instances"]
]
Expand Down Expand Up @@ -192,13 +195,21 @@ async def get_instances(
assert "InstanceType" in instance # nosec
assert "State" in instance # nosec
assert "Name" in instance["State"] # nosec
ec2_instance_types = await self.get_ec2_instance_capabilities(
{instance["InstanceType"]}
)
assert len(ec2_instance_types) == 1 # nosec
all_instances.append(
EC2InstanceData(
launch_time=instance["LaunchTime"],
id=instance["InstanceId"],
aws_private_dns=instance["PrivateDnsName"],
type=instance["InstanceType"],
state=instance["State"]["Name"],
resources=Resources(
cpus=ec2_instance_types[0].cpus,
ram=ec2_instance_types[0].ram,
),
)
)
logger.debug("received: %s", f"{all_instances=}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def _find_node_with_name(node: Node) -> bool:
try:
docker_node_name = node_host_name_from_ec2_private_dns(instance_data)
except Ec2InvalidDnsNameError:
_logger.exception("Unexcepted EC2 private dns name")
_logger.exception("Unexpected EC2 private dns name")
non_associated_instances.append(instance_data)
continue

Expand Down Expand Up @@ -125,6 +125,7 @@ def filter_by_task_defined_instance(
instance_type_name: InstanceTypeType | None,
active_instances_to_tasks,
pending_instances_to_tasks,
drained_instances_to_tasks,
needed_new_instance_types_for_tasks,
) -> tuple:
return (
Expand All @@ -140,6 +141,12 @@ def filter_by_task_defined_instance(
),
pending_instances_to_tasks,
),
filter(
functools.partial(
_instance_data_map_by_type_name, type_name=instance_type_name
),
drained_instances_to_tasks,
),
filter(
functools.partial(
_instance_type_map_by_type_name, type_name=instance_type_name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
from typing import Final, Iterable
from collections.abc import Iterable
from typing import Final

from dask_task_models_library.constants import DASK_TASK_EC2_RESOURCE_RESTRICTION_KEY
from fastapi import FastAPI
Expand All @@ -15,7 +16,6 @@
EC2InstanceType,
Resources,
)
from . import utils_docker

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,7 +46,7 @@ def try_assigning_task_to_node(
instance_to_tasks: Iterable[tuple[AssociatedInstance, list[DaskTask]]],
) -> bool:
for instance, node_assigned_tasks in instance_to_tasks:
instance_total_resource = utils_docker.get_node_total_resources(instance.node)
instance_total_resource = instance.ec2_instance.resources
tasks_needed_resources = _compute_tasks_needed_resources(node_assigned_tasks)
if (
instance_total_resource - tasks_needed_resources
Expand All @@ -60,7 +60,6 @@ async def try_assigning_task_to_instances(
app: FastAPI,
pending_task: DaskTask,
instances_to_tasks: Iterable[tuple[EC2InstanceData, list[DaskTask]]],
type_to_instance_map: dict[str, EC2InstanceType],
*,
notify_progress: bool,
) -> bool:
Expand All @@ -70,10 +69,7 @@ async def try_assigning_task_to_instances(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME
)
for instance, instance_assigned_tasks in instances_to_tasks:
instance_type = type_to_instance_map[instance.type]
instance_total_resources = Resources(
cpus=instance_type.cpus, ram=instance_type.ram
)
instance_total_resources = instance.resources
tasks_needed_resources = _compute_tasks_needed_resources(
instance_assigned_tasks
)
Expand Down
Loading