Skip to content

Commit

Permalink
šŸ›Autoscaling/Comp backend: drain retired nodes so that they can be reā€¦
Browse files Browse the repository at this point in the history
ā€¦-used (#6345)
  • Loading branch information
sanderegg authored Sep 10, 2024
1 parent 1320b69 commit 5b35cfe
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import distributed
import rich
import typer
from mypy_boto3_ec2.service_resource import Instance
from pydantic import AnyUrl

Expand Down Expand Up @@ -64,25 +63,6 @@ async def dask_client(
f"{url}", security=security, timeout="5", asynchronous=True
)
)
versions = await _wrap_dask_async_call(client.get_versions())
if versions["client"]["python"] != versions["scheduler"]["python"]:
rich.print(
f"[red]python versions do not match! TIP: install the correct version {versions['scheduler']['python']}[/red]"
)
raise typer.Exit(1)
if (
versions["client"]["distributed"]
!= versions["scheduler"]["distributed"]
):
rich.print(
f"[red]distributed versions do not match! TIP: install the correct version {versions['scheduler']['distributed']}[/red]"
)
raise typer.Exit(1)
if versions["client"]["dask"] != versions["scheduler"]["dask"]:
rich.print(
f"[red]dask versions do not match! TIP: install the correct version {versions['scheduler']['dask']}[/red]"
)
raise typer.Exit(1)
yield client

finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class Cluster: # pylint: disable=too-many-instance-attributes
"description": "This is a EC2-backed docker node which is docker drained and waiting for termination"
}
)
retired_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which was retired and waiting to be drained and eventually terminated or re-used"
}
)
terminated_instances: list[NonAssociatedInstance]

def can_scale_down(self) -> bool:
Expand All @@ -107,6 +112,7 @@ def can_scale_down(self) -> bool:
or self.drained_nodes
or self.pending_ec2s
or self.terminating_nodes
or self.retired_nodes
)

def total_number_of_machines(self) -> int:
Expand All @@ -119,6 +125,7 @@ def total_number_of_machines(self) -> int:
+ len(self.pending_ec2s)
+ len(self.broken_ec2s)
+ len(self.terminating_nodes)
+ len(self.retired_nodes)
)

def __repr__(self) -> str:
Expand All @@ -137,6 +144,7 @@ def _get_instance_ids(
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
f"disconnected-nodes: count={len(self.disconnected_nodes)}, "
f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, "
f"retired-nodes: count={len(self.retired_nodes)} {_get_instance_ids(self.retired_nodes)}, "
f"terminated-ec2s: count={len(self.terminated_instances)} {_get_instance_ids(self.terminated_instances)})"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_gather
from servicelib.utils_formatting import timedelta_as_minute_second
from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..constants import DOCKER_JOIN_COMMAND_EC2_TAG_KEY, DOCKER_JOIN_COMMAND_NAME
from ..core.errors import (
Ec2InvalidDnsNameError,
TaskBestFittingInstanceNotFoundError,
Expand Down Expand Up @@ -123,7 +123,7 @@ async def _analyze_current_cluster(
]

# analyse attached ec2s
active_nodes, pending_nodes, all_drained_nodes = [], [], []
active_nodes, pending_nodes, all_drained_nodes, retired_nodes = [], [], [], []
for instance in attached_ec2s:
if await auto_scaling_mode.is_instance_active(app, instance):
node_used_resources = await auto_scaling_mode.compute_node_used_resources(
Expand All @@ -138,6 +138,9 @@ async def _analyze_current_cluster(
)
elif auto_scaling_mode.is_instance_drained(instance):
all_drained_nodes.append(instance)
elif await auto_scaling_mode.is_instance_retired(app, instance):
# it should be drained, but it is not, so we force it to be drained such that it might be re-used if needed
retired_nodes.append(instance)
else:
pending_nodes.append(instance)

Expand All @@ -159,6 +162,7 @@ async def _analyze_current_cluster(
NonAssociatedInstance(ec2_instance=i) for i in terminated_ec2_instances
],
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
retired_nodes=retired_nodes,
)
_logger.info("current state: %s", f"{cluster!r}")
return cluster
Expand Down Expand Up @@ -329,14 +333,14 @@ async def sorted_allowed_instance_types(app: FastAPI) -> list[EC2InstanceType]:
ec2_client = get_ec2_client(app)

# some instances might be able to run several tasks
allowed_instance_types: list[EC2InstanceType] = (
await ec2_client.get_ec2_instance_capabilities(
cast(
set[InstanceTypeType],
set(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
),
)
allowed_instance_types: list[
EC2InstanceType
] = await ec2_client.get_ec2_instance_capabilities(
cast(
set[InstanceTypeType],
set(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES,
),
)
)

Expand Down Expand Up @@ -1081,6 +1085,43 @@ async def _notify_machine_creation_progress(
)


async def _drain_retired_nodes(
app: FastAPI,
cluster: Cluster,
) -> Cluster:
if not cluster.retired_nodes:
return cluster

app_settings = get_application_settings(app)
docker_client = get_docker_client(app)
# drain this empty nodes
updated_nodes: list[Node] = await asyncio.gather(
*(
utils_docker.set_node_osparc_ready(
app_settings,
docker_client,
node.node,
ready=False,
)
for node in cluster.retired_nodes
)
)
if updated_nodes:
_logger.info(
"following nodes were set to drain: '%s'",
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
)
newly_drained_instances = [
AssociatedInstance(node=node, ec2_instance=instance.ec2_instance)
for instance, node in zip(cluster.retired_nodes, updated_nodes, strict=True)
]
return dataclasses.replace(
cluster,
retired_nodes=[],
drained_nodes=cluster.drained_nodes + newly_drained_instances,
)


async def _autoscale_cluster(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -1187,6 +1228,7 @@ async def auto_scale_cluster(
cluster = await _try_attach_pending_ec2s(
app, cluster, auto_scaling_mode, allowed_instance_types
)
cluster = await _drain_retired_nodes(app, cluster)

cluster = await _autoscale_cluster(
app, cluster, auto_scaling_mode, allowed_instance_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ async def compute_cluster_total_resources(
async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool:
...

@staticmethod
@abstractmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
...

@staticmethod
def is_instance_drained(instance: AssociatedInstance) -> bool:
return not utils_docker.is_node_osparc_ready(instance.node)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)

@staticmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
if not utils_docker.is_node_osparc_ready(instance.node):
return False
return await dask.is_worker_retired(
_scheduler_url(app), _scheduler_auth(app), instance.ec2_instance
)

@staticmethod
async def try_retire_nodes(app: FastAPI) -> None:
await dask.try_retire_nodes(_scheduler_url(app), _scheduler_auth(app))
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ async def is_instance_active(app: FastAPI, instance: AssociatedInstance) -> bool
assert app # nosec
return utils_docker.is_node_osparc_ready(instance.node)

@staticmethod
async def is_instance_retired(app: FastAPI, instance: AssociatedInstance) -> bool:
assert app # nosec
assert instance # nosec
# nothing to do here
return False

@staticmethod
async def try_retire_nodes(app: FastAPI) -> None:
assert app # nosec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import distributed.scheduler
from aws_library.ec2 import EC2InstanceData, Resources
from dask_task_models_library.resource_constraints import DaskTaskResources
from distributed.core import Status
from models_library.clusters import InternalClusterAuthentication, TLSAuthentication
from pydantic import AnyUrl, ByteSize, parse_obj_as

Expand Down Expand Up @@ -120,8 +121,28 @@ async def is_worker_connected(
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
async with _scheduler_client(scheduler_url, authentication) as client:
_dask_worker_from_ec2_instance(client, worker_ec2_instance)
return True
_, worker_details = _dask_worker_from_ec2_instance(
client, worker_ec2_instance
)
return Status(worker_details["status"]) == Status.running
return False


async def is_worker_retired(
scheduler_url: AnyUrl,
authentication: InternalClusterAuthentication,
worker_ec2_instance: EC2InstanceData,
) -> bool:
with contextlib.suppress(DaskNoWorkersError, DaskWorkerNotFoundError):
async with _scheduler_client(scheduler_url, authentication) as client:
_, worker_details = _dask_worker_from_ec2_instance(
client, worker_ec2_instance
)
return Status(worker_details["status"]) in {
Status.closed,
Status.closing,
Status.closing_gracefully,
}
return False


Expand Down Expand Up @@ -152,9 +173,9 @@ def _list_tasks(
}

async with _scheduler_client(scheduler_url, authentication) as client:
list_of_tasks: dict[
dask.typing.Key, DaskTaskResources
] = await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks))
list_of_tasks: dict[dask.typing.Key, DaskTaskResources] = (
await _wrap_client_async_routine(client.run_on_scheduler(_list_tasks))
)
_logger.debug("found unrunnable tasks: %s", list_of_tasks)
return [
DaskTask(
Expand Down Expand Up @@ -186,10 +207,10 @@ def _list_processing_tasks(
return worker_to_processing_tasks

async with _scheduler_client(scheduler_url, authentication) as client:
worker_to_tasks: dict[
str, list[tuple[dask.typing.Key, DaskTaskResources]]
] = await _wrap_client_async_routine(
client.run_on_scheduler(_list_processing_tasks)
worker_to_tasks: dict[str, list[tuple[dask.typing.Key, DaskTaskResources]]] = (
await _wrap_client_async_routine(
client.run_on_scheduler(_list_processing_tasks)
)
)
_logger.debug("found processing tasks: %s", worker_to_tasks)
tasks_per_worker = defaultdict(list)
Expand Down Expand Up @@ -255,12 +276,12 @@ def _list_processing_tasks_on_worker(
_logger.debug("looking for processing tasksfor %s", f"{worker_url=}")

# now get the used resources
worker_processing_tasks: list[
tuple[dask.typing.Key, DaskTaskResources]
] = await _wrap_client_async_routine(
client.run_on_scheduler(
_list_processing_tasks_on_worker, worker_url=worker_url
),
worker_processing_tasks: list[tuple[dask.typing.Key, DaskTaskResources]] = (
await _wrap_client_async_routine(
client.run_on_scheduler(
_list_processing_tasks_on_worker, worker_url=worker_url
),
)
)

total_resources_used: collections.Counter[str] = collections.Counter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
"Number of EC2-backed docker nodes that started the termination process",
EC2_INSTANCE_LABELS,
),
"retired_nodes": (
"Number of EC2-backed docker nodes that were actively retired and waiting for draining and termination or re-use",
EC2_INSTANCE_LABELS,
),
"terminated_instances": (
"Number of EC2 instances that were terminated (they are typically visible 1 hour)",
EC2_INSTANCE_LABELS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attribut
buffer_ec2s: TrackedGauge = field(init=False)
disconnected_nodes: TrackedGauge = field(init=False)
terminating_nodes: TrackedGauge = field(init=False)
retired_nodes: TrackedGauge = field(init=False)
terminated_instances: TrackedGauge = field(init=False)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
services:
autoscaling:
environment:
- AUTOSCALING_DASK={}
- DASK_MONITORING_URL=tcp://dask-scheduler:8786
- DASK_SCHEDULER_AUTH='{}'
- DASK_SCHEDULER_AUTH={}
dask-sidecar:
image: itisfoundation/dask-sidecar:master-github-latest
init: true
Expand Down
1 change: 1 addition & 0 deletions services/autoscaling/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ def _creator(**cluter_overrides) -> Cluster:
buffer_ec2s=[],
disconnected_nodes=[],
terminating_nodes=[],
retired_nodes=[],
terminated_instances=[],
),
**cluter_overrides,
Expand Down

0 comments on commit 5b35cfe

Please sign in to comment.