Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Autoscaling: add buffer metrics #6260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/aws-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

aioboto3
aiocache
arrow
pydantic[email]
types-aiobotocore[ec2,s3,ssm]
sh
8 changes: 1 addition & 7 deletions packages/aws-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ arrow==1.3.0
# -r requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/service-library/requirements/../../../packages/models-library/requirements/_base.in
# -r requirements/../../../packages/service-library/requirements/_base.in
async-timeout==4.0.3
# via
# aiohttp
# redis
# -r requirements/_base.in
attrs==24.2.0
# via
# aiohttp
Expand All @@ -65,8 +62,6 @@ dnspython==2.6.1
# via email-validator
email-validator==2.2.0
# via pydantic
exceptiongroup==1.2.2
# via anyio
fast-depends==2.4.8
# via faststream
faststream==0.5.18
Expand Down Expand Up @@ -201,7 +196,6 @@ types-python-dateutil==2.9.0.20240821
typing-extensions==4.12.2
# via
# aiodebug
# anyio
# faststream
# pydantic
# typer
Expand Down
8 changes: 0 additions & 8 deletions packages/aws-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ cryptography==43.0.0
# moto
docker==7.1.0
# via moto
exceptiongroup==1.2.2
# via
# -c requirements/_base.txt
# pytest
faker==27.0.0
# via -r requirements/_test.in
flask==3.0.3
Expand Down Expand Up @@ -246,10 +242,6 @@ sympy==1.13.2
# via cfn-lint
termcolor==2.4.0
# via pytest-sugar
tomli==2.0.1
# via
# coverage
# pytest
types-aioboto3==13.1.1
# via -r requirements/_test.in
types-aiobotocore==2.13.2
Expand Down
10 changes: 0 additions & 10 deletions packages/aws-library/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,12 @@ setuptools==73.0.1
# via
# -c requirements/_test.txt
# pip-tools
tomli==2.0.1
# via
# -c requirements/_test.txt
# black
# build
# mypy
# pip-tools
# pylint
tomlkit==0.13.2
# via pylint
typing-extensions==4.12.2
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
# astroid
# black
# mypy
virtualenv==20.26.3
# via pre-commit
Expand Down
9 changes: 9 additions & 0 deletions packages/aws-library/src/aws_library/ssm/_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import contextlib
import datetime
import logging
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Final, cast

import aioboto3
import arrow
mrnicegyu11 marked this conversation as resolved.
Show resolved Hide resolved
import botocore
import botocore.exceptions
from aiobotocore.session import ClientCreatorContext
Expand All @@ -31,6 +33,8 @@ class SSMCommand:
command_id: str
instance_ids: Sequence[str]
status: CommandStatusType
start_time: datetime.datetime | None
finish_time: datetime.datetime | None
message: str | None = None


Expand Down Expand Up @@ -89,12 +93,15 @@ async def send_command(
assert "Comment" in response["Command"] # nosec
assert "CommandId" in response["Command"] # nosec
assert "Status" in response["Command"] # nosec
assert "RequestedDateTime" in response["Command"] # nosec

return SSMCommand(
name=response["Command"]["Comment"],
command_id=response["Command"]["CommandId"],
status=response["Command"]["Status"],
instance_ids=instance_ids,
start_time=None,
finish_time=None,
)

@log_decorator(_logger, logging.DEBUG)
Expand All @@ -111,6 +118,8 @@ async def get_command(self, instance_id: str, *, command_id: str) -> SSMCommand:
instance_ids=[response["InstanceId"]],
status=response["Status"] if response["Status"] != "Delayed" else "Pending",
message=response["StatusDetails"],
start_time=arrow.get(response["ExecutionStartDateTime"]).datetime,
finish_time=arrow.get(response["ExecutionEndDateTime"]).datetime,
)

@log_decorator(_logger, logging.DEBUG)
Expand Down
2 changes: 2 additions & 0 deletions packages/aws-library/tests/test_ssm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ async def test_send_command(
assert dataclasses.asdict(got) == {
**dataclasses.asdict(sent_command),
"message": "Success",
"start_time": got.start_time,
"finish_time": got.finish_time,
}
with pytest.raises(SSMInvalidCommandError):
await simcore_ssm_api.get_command(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class Cluster: # pylint: disable=too-many-instance-attributes
"description": "This is a EC2-backed docker node which is drained (cannot accept tasks)"
}
)
reserve_drained_nodes: list[AssociatedInstance] = field(
buffer_drained_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)"
}
Expand Down Expand Up @@ -115,7 +115,7 @@ def total_number_of_machines(self) -> int:
len(self.active_nodes)
+ len(self.pending_nodes)
+ len(self.drained_nodes)
+ len(self.reserve_drained_nodes)
+ len(self.buffer_drained_nodes)
+ len(self.pending_ec2s)
+ len(self.broken_ec2s)
+ len(self.terminating_nodes)
Expand All @@ -131,7 +131,7 @@ def _get_instance_ids(
f"Cluster(active-nodes: count={len(self.active_nodes)} {_get_instance_ids(self.active_nodes)}, "
f"pending-nodes: count={len(self.pending_nodes)} {_get_instance_ids(self.pending_nodes)}, "
f"drained-nodes: count={len(self.drained_nodes)} {_get_instance_ids(self.drained_nodes)}, "
f"reserve-drained-nodes: count={len(self.reserve_drained_nodes)} {_get_instance_ids(self.reserve_drained_nodes)}, "
f"reserve-drained-nodes: count={len(self.buffer_drained_nodes)} {_get_instance_ids(self.buffer_drained_nodes)}, "
f"pending-ec2s: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, "
f"broken-ec2s: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, "
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ async def _analyze_current_cluster(
else:
pending_nodes.append(instance)

drained_nodes, reserve_drained_nodes, terminating_nodes = sort_drained_nodes(
drained_nodes, buffer_drained_nodes, terminating_nodes = sort_drained_nodes(
app_settings, all_drained_nodes, allowed_instance_types
)
cluster = Cluster(
active_nodes=active_nodes,
pending_nodes=pending_nodes,
drained_nodes=drained_nodes,
reserve_drained_nodes=reserve_drained_nodes,
buffer_drained_nodes=buffer_drained_nodes,
pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s],
broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s],
buffer_ec2s=[
Expand Down Expand Up @@ -285,19 +285,19 @@ async def _try_attach_pending_ec2s(
)
else:
still_pending_ec2s.append(instance_data)
except Ec2InvalidDnsNameError: # noqa: PERF203
except Ec2InvalidDnsNameError:
_logger.exception("Unexpected EC2 private dns")
# NOTE: first provision the reserve drained nodes if possible
all_drained_nodes = (
cluster.drained_nodes + cluster.reserve_drained_nodes + new_found_instances
cluster.drained_nodes + cluster.buffer_drained_nodes + new_found_instances
)
drained_nodes, reserve_drained_nodes, _ = sort_drained_nodes(
drained_nodes, buffer_drained_nodes, _ = sort_drained_nodes(
app_settings, all_drained_nodes, allowed_instance_types
)
return dataclasses.replace(
cluster,
drained_nodes=drained_nodes,
reserve_drained_nodes=reserve_drained_nodes,
buffer_drained_nodes=buffer_drained_nodes,
pending_ec2s=still_pending_ec2s,
)

Expand Down Expand Up @@ -359,9 +359,7 @@ async def _activate_drained_nodes(
) -> Cluster:
nodes_to_activate = [
node
for node in itertools.chain(
cluster.drained_nodes, cluster.reserve_drained_nodes
)
for node in itertools.chain(cluster.drained_nodes, cluster.buffer_drained_nodes)
if node.assigned_tasks
]

Expand All @@ -380,14 +378,14 @@ async def _activate_drained_nodes(
]
remaining_reserved_drained_nodes = [
node
for node in cluster.reserve_drained_nodes
for node in cluster.buffer_drained_nodes
if node.ec2_instance.id not in new_active_node_ids
]
return dataclasses.replace(
cluster,
active_nodes=cluster.active_nodes + nodes_to_activate,
drained_nodes=remaining_drained_nodes,
reserve_drained_nodes=remaining_reserved_drained_nodes,
buffer_drained_nodes=remaining_reserved_drained_nodes,
)


Expand Down Expand Up @@ -490,7 +488,7 @@ async def _assign_tasks_to_current_cluster(
),
lambda task, required_ec2, required_resources: _try_assign_task_to_ec2_instance(
task,
instances=cluster.drained_nodes + cluster.reserve_drained_nodes,
instances=cluster.drained_nodes + cluster.buffer_drained_nodes,
task_required_ec2_instance=required_ec2,
task_required_resources=required_resources,
),
Expand Down Expand Up @@ -620,7 +618,7 @@ async def _find_needed_instances(
if (
num_missing_nodes := (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.reserve_drained_nodes)
- len(cluster.buffer_drained_nodes)
)
) > 0:
# check if some are already pending
Expand All @@ -629,7 +627,7 @@ async def _find_needed_instances(
] + [i.ec2_instance for i in cluster.pending_nodes if not i.assigned_tasks]
if len(remaining_pending_instances) < (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
- len(cluster.reserve_drained_nodes)
- len(cluster.buffer_drained_nodes)
):
default_instance_type = get_machine_buffer_type(available_ec2_types)
num_instances_per_type[default_instance_type] += num_missing_nodes
Expand Down Expand Up @@ -1085,7 +1083,7 @@ async def _autoscale_cluster(
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
if queued_or_missing_instance_tasks or (
len(cluster.reserve_drained_nodes)
len(cluster.buffer_drained_nodes)
< app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER
):
if (
Expand Down Expand Up @@ -1126,7 +1124,7 @@ async def _notify_autoscaling_status(

monitored_instances = list(
itertools.chain(
cluster.active_nodes, cluster.drained_nodes, cluster.reserve_drained_nodes
cluster.active_nodes, cluster.drained_nodes, cluster.buffer_drained_nodes
)
)

Expand All @@ -1147,7 +1145,7 @@ async def _notify_autoscaling_status(
)
# prometheus instrumentation
if has_instrumentation(app):
get_instrumentation(app).update_from_cluster(cluster)
get_instrumentation(app).cluster_metrics.update_from_cluster(cluster)


async def auto_scale_cluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
from fastapi import FastAPI
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_context
from simcore_service_autoscaling.modules.instrumentation import (
get_instrumentation,
has_instrumentation,
)
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..constants import (
Expand Down Expand Up @@ -68,6 +72,14 @@ async def _analyze_running_instance_state(
elif await ssm_client.is_instance_connected_to_ssm_server(instance.id):
try:
if await ssm_client.wait_for_has_instance_completed_cloud_init(instance.id):
if has_instrumentation(app):
get_instrumentation(
app
).buffer_machines_pools_metrics.instances_ready_to_pull_seconds.labels(
instance_type=instance.type
).observe(
(arrow.utcnow().datetime - instance.launch_time).total_seconds()
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
)
if app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance.type
].pre_pull_images:
Expand Down Expand Up @@ -314,6 +326,18 @@ async def _handle_pool_image_pulling(
)
match ssm_command.status:
case "Success":
if has_instrumentation(app):
assert ssm_command.start_time is not None # nosec
assert ssm_command.finish_time is not None # nosec
get_instrumentation(
app
).buffer_machines_pools_metrics.instances_completed_pulling_seconds.labels(
instance_type=instance.type
).observe(
(
ssm_command.finish_time - ssm_command.start_time
).total_seconds()
)
instances_to_stop.add(instance)
case "InProgress" | "Pending":
# do nothing we pass
Expand Down Expand Up @@ -409,3 +433,9 @@ async def monitor_buffer_machines(

# 4. pull docker images if needed
await _handle_image_pre_pulling(app, buffers_manager)

# 5. instrumentation
if has_instrumentation(app):
get_instrumentation(
app
).buffer_machines_pools_metrics.update_from_buffer_pool_manager(buffers_manager)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async def _startup() -> None:

def on_app_shutdown(app: FastAPI) -> Callable[[], Awaitable[None]]:
async def _stop() -> None:
await stop_periodic_task(app.state.autoscaler_task)
mrnicegyu11 marked this conversation as resolved.
Show resolved Hide resolved
if hasattr(app.state, "buffers_pool_task"):
await stop_periodic_task(app.state.buffers_pool_task)

Expand Down
Loading
Loading