Skip to content

Commit

Permalink
🎨Autoscaling: chunk prepulled image AWS EC2 tags (#6232)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Aug 26, 2024
1 parent 3c0909d commit 8cfffaa
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 61 deletions.
8 changes: 6 additions & 2 deletions packages/aws-library/src/aws_library/ec2/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ class EC2InstanceType:

class AWSTagKey(ConstrainedStr):
# see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html#tag-restrictions]
regex = re.compile(r"^(?!(_index|\.{1,2})$)[a-zA-Z0-9\+\-=\._:@]{1,128}$")
regex = re.compile(r"^(?!(_index|\.{1,2})$)[a-zA-Z0-9\+\-=\._:@]+$")
min_length = 1
max_length = 128


class AWSTagValue(ConstrainedStr):
# see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html#tag-restrictions]
# quotes []{} were added as it allows to json encode. it seems to be accepted as a value
regex = re.compile(r"^[a-zA-Z0-9\s\+\-=\.,_:/@\"\'\[\]\{\}]{0,256}$")
regex = re.compile(r"^[a-zA-Z0-9\s\+\-=\.,_:/@\"\'\[\]\{\}]*$")
min_length = 0
max_length = 256


EC2Tags: TypeAlias = dict[AWSTagKey, AWSTagValue]
Expand Down
34 changes: 34 additions & 0 deletions services/autoscaling/src/simcore_service_autoscaling/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import re
from typing import Final

from aws_library.ec2._models import AWSTagKey, AWSTagValue, EC2Tags
from pydantic import parse_obj_as

BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "pulling"
)
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "ssm-command-id"
)
PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"

DOCKER_PULL_COMMAND: Final[
str
] = "docker compose -f /docker-pull.compose.yml -p buffering pull"

PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"
)

BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.buffer_machine"
)
DEACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true")
}
ACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "false")
}
PRE_PULLED_IMAGES_RE: Final[re.Pattern] = re.compile(
rf"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_\((\d+)\)"
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

import logging
from collections import defaultdict
from typing import Final, TypeAlias, cast
from typing import TypeAlias, cast

from aws_library.ec2 import (
AWSTagKey,
AWSTagValue,
EC2InstanceConfig,
EC2InstanceData,
Expand All @@ -31,31 +30,28 @@
SSMCommandExecutionTimeoutError,
)
from fastapi import FastAPI
from models_library.utils.json_serialization import json_dumps, json_loads
from pydantic import NonNegativeInt, parse_obj_as
from pydantic import NonNegativeInt
from servicelib.logging_utils import log_context
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..constants import (
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
DOCKER_PULL_COMMAND,
PREPULL_COMMAND_NAME,
)
from ..core.settings import get_application_settings
from ..models import BufferPool, BufferPoolManager
from ..utils.auto_scaling_core import ec2_buffer_startup_script
from ..utils.buffer_machines_pool_core import get_deactivated_buffer_ec2_tags
from ..utils.buffer_machines_pool_core import (
dump_pre_pulled_images_as_tags,
get_deactivated_buffer_ec2_tags,
load_pre_pulled_images_from_tags,
)
from .auto_scaling_mode_base import BaseAutoscaling
from .ec2 import get_ec2_client
from .ssm import get_ssm_client

_BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "pulling"
)
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "ssm-command-id"
)
_PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"
_PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"
)


_logger = logging.getLogger(__name__)


Expand All @@ -64,7 +60,7 @@ async def _analyze_running_instance_state(
):
ssm_client = get_ssm_client(app)

if _BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags:
if BUFFER_MACHINE_PULLING_EC2_TAG_KEY in instance.tags:
buffer_pool.pulling_instances.add(instance)
elif await ssm_client.is_instance_connected_to_ssm_server(instance.id):
app_settings = get_application_settings(app)
Expand Down Expand Up @@ -176,9 +172,7 @@ async def _terminate_instances_with_invalid_pre_pulled_images(

for instance in all_pre_pulled_instances:
if (
pre_pulled_images := json_loads(
instance.tags.get(_PRE_PULLED_IMAGES_EC2_TAG_KEY, "[]")
)
pre_pulled_images := load_pre_pulled_images_from_tags(instance.tags)
) and pre_pulled_images != ec2_boot_config.pre_pull_images:
_logger.info(
"%s",
Expand Down Expand Up @@ -269,9 +263,6 @@ async def _add_remove_buffer_instances(

InstancesToStop: TypeAlias = set[EC2InstanceData]
InstancesToTerminate: TypeAlias = set[EC2InstanceData]
_DOCKER_PULL_COMMAND: Final[
str
] = "docker compose -f /docker-pull.compose.yml -p buffering pull"


async def _handle_pool_image_pulling(
Expand All @@ -283,14 +274,14 @@ async def _handle_pool_image_pulling(
# trigger the image pulling
ssm_command = await ssm_client.send_command(
[instance.id for instance in pool.waiting_to_pull_instances],
command=_DOCKER_PULL_COMMAND,
command_name=_PREPULL_COMMAND_NAME,
command=DOCKER_PULL_COMMAND,
command_name=PREPULL_COMMAND_NAME,
)
await ec2_client.set_instances_tags(
tuple(pool.waiting_to_pull_instances),
tags={
_BUFFER_MACHINE_PULLING_EC2_TAG_KEY: AWSTagValue("true"),
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: AWSTagValue(
BUFFER_MACHINE_PULLING_EC2_TAG_KEY: AWSTagValue("true"),
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: AWSTagValue(
ssm_command.command_id
),
},
Expand All @@ -301,7 +292,7 @@ async def _handle_pool_image_pulling(
# wait for the image pulling to complete
for instance in pool.pulling_instances:
if ssm_command_id := instance.tags.get(
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY
):
ssm_command = await ssm_client.get_command(
instance.id, command_id=ssm_command_id
Expand All @@ -323,15 +314,11 @@ async def _handle_pool_image_pulling(
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
await ec2_client.set_instances_tags(
tuple(instances_to_stop),
tags={
_PRE_PULLED_IMAGES_EC2_TAG_KEY: AWSTagValue(
json_dumps(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type
].pre_pull_images
)
)
},
tags=dump_pre_pulled_images_as_tags(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type
].pre_pull_images
),
)
return instances_to_stop, broken_instances_to_terminate

Expand All @@ -357,8 +344,8 @@ async def _handle_image_pre_pulling(
"pending buffer instances completed pulling of images, stopping them",
):
tag_keys_to_remove = (
_BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_EC2_TAG_KEY,
BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY,
)
await ec2_client.remove_instances_tags(
tuple(instances_to_stop),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
from typing import Final
from collections.abc import Iterable
from operator import itemgetter

from aws_library.ec2 import AWSTagKey, AWSTagValue, EC2Tags
from fastapi import FastAPI
from pydantic import parse_obj_as
from models_library.docker import DockerGenericTag
from models_library.utils.json_serialization import json_dumps
from pydantic import parse_obj_as, parse_raw_as

from ..modules.auto_scaling_mode_base import BaseAutoscaling

_BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.buffer_machine"
from ..constants import (
ACTIVATED_BUFFER_MACHINE_EC2_TAGS,
BUFFER_MACHINE_TAG_KEY,
DEACTIVATED_BUFFER_MACHINE_EC2_TAGS,
PRE_PULLED_IMAGES_EC2_TAG_KEY,
PRE_PULLED_IMAGES_RE,
)
_DEACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
_BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true")
}
_ACTIVATED_BUFFER_MACHINE_EC2_TAGS: Final[EC2Tags] = {
_BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "false")
}
from ..modules.auto_scaling_mode_base import BaseAutoscaling


def get_activated_buffer_ec2_tags(
app: FastAPI, auto_scaling_mode: BaseAutoscaling
) -> EC2Tags:
return auto_scaling_mode.get_ec2_tags(app) | _ACTIVATED_BUFFER_MACHINE_EC2_TAGS
return auto_scaling_mode.get_ec2_tags(app) | ACTIVATED_BUFFER_MACHINE_EC2_TAGS


def get_deactivated_buffer_ec2_tags(
app: FastAPI, auto_scaling_mode: BaseAutoscaling
) -> EC2Tags:
base_ec2_tags = (
auto_scaling_mode.get_ec2_tags(app) | _DEACTIVATED_BUFFER_MACHINE_EC2_TAGS
auto_scaling_mode.get_ec2_tags(app) | DEACTIVATED_BUFFER_MACHINE_EC2_TAGS
)
base_ec2_tags[AWSTagKey("Name")] = AWSTagValue(
f"{base_ec2_tags[AWSTagKey('Name')]}-buffer"
Expand All @@ -36,4 +36,49 @@ def get_deactivated_buffer_ec2_tags(


def is_buffer_machine(tags: EC2Tags) -> bool:
return bool(_BUFFER_MACHINE_TAG_KEY in tags)
return bool(BUFFER_MACHINE_TAG_KEY in tags)


def dump_pre_pulled_images_as_tags(images: Iterable[DockerGenericTag]) -> EC2Tags:
# AWS Tag Values are limited to 256 characaters so we chunk the images
# into smaller chunks
jsonized_images = json_dumps(images)
assert AWSTagValue.max_length # nosec
if len(jsonized_images) > AWSTagValue.max_length:
# let's chunk the string
chunk_size = AWSTagValue.max_length
chunks = [
jsonized_images[i : i + chunk_size]
for i in range(0, len(jsonized_images), chunk_size)
]
return {
AWSTagKey(f"{PRE_PULLED_IMAGES_EC2_TAG_KEY}_({i})"): AWSTagValue(c)
for i, c in enumerate(chunks)
}
return {
PRE_PULLED_IMAGES_EC2_TAG_KEY: parse_obj_as(AWSTagValue, json_dumps(images))
}


def load_pre_pulled_images_from_tags(tags: EC2Tags) -> list[DockerGenericTag]:
# AWS Tag values are limited to 256 characters so we chunk the images
if PRE_PULLED_IMAGES_EC2_TAG_KEY in tags:
# read directly
return parse_raw_as(list[DockerGenericTag], tags[PRE_PULLED_IMAGES_EC2_TAG_KEY])

assembled_json = "".join(
map(
itemgetter(1),
sorted(
(
(int(m.group(1)), value)
for key, value in tags.items()
if (m := PRE_PULLED_IMAGES_RE.match(key))
),
key=itemgetter(0),
),
)
)
if assembled_json:
return parse_raw_as(list[DockerGenericTag], assembled_json)
return []
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
)
from pytest_simcore.helpers.logging_tools import log_context
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from simcore_service_autoscaling.constants import PRE_PULLED_IMAGES_EC2_TAG_KEY
from simcore_service_autoscaling.core.settings import ApplicationSettings
from simcore_service_autoscaling.modules.auto_scaling_mode_dynamic import (
DynamicAutoscaling,
)
from simcore_service_autoscaling.modules.buffer_machines_pool_core import (
_PRE_PULLED_IMAGES_EC2_TAG_KEY,
monitor_buffer_machines,
)
from simcore_service_autoscaling.utils.buffer_machines_pool_core import (
Expand Down Expand Up @@ -311,7 +311,7 @@ async def _assert_wait_for_ssm_command_to_finish() -> None:
expected_instance_type=next(iter(ec2_instances_allowed_types)),
expected_instance_state="stopped",
expected_additional_tag_keys=[
_PRE_PULLED_IMAGES_EC2_TAG_KEY,
PRE_PULLED_IMAGES_EC2_TAG_KEY,
*list(ec2_instance_custom_tags),
],
expected_pre_pulled_images=pre_pulled_images,
Expand Down Expand Up @@ -376,7 +376,7 @@ async def _do(
if pre_pull_images is not None and instance_state_name == "stopped":
resource_tags.append(
{
"Key": _PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Key": PRE_PULLED_IMAGES_EC2_TAG_KEY,
"Value": f"{json_dumps(pre_pull_images)}",
}
)
Expand Down
Loading

0 comments on commit 8cfffaa

Please sign in to comment.