Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛🎨Computational autoscaling: allow multi-machining/processing #5203

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
47194b9
instance ready is depending on mode
sanderegg Dec 21, 2023
ea6f013
hmm not yet totally there
sanderegg Dec 21, 2023
96114d5
missing manual testing parts
sanderegg Jan 9, 2024
e231f20
added remote debugging facilities
sanderegg Jan 9, 2024
8758ee9
added is_worker_connected and refcator
sanderegg Jan 9, 2024
0e48b85
added pending_nodes
sanderegg Jan 9, 2024
1b15221
use pending nodes and refactor
sanderegg Jan 9, 2024
7c62890
ensure negative values for cpus are capped to 0
sanderegg Jan 9, 2024
084e753
misconfigured in redis
sanderegg Jan 9, 2024
c53119b
renaming + logs
sanderegg Jan 9, 2024
b08beda
ensure not available scheduler does not break autoscaling
sanderegg Jan 9, 2024
fb21a6b
improving feedback
sanderegg Jan 9, 2024
885694a
do not expose redis
sanderegg Jan 9, 2024
a6de940
minor
sanderegg Jan 9, 2024
e876707
added function to get all processing tasks per worker
sanderegg Jan 9, 2024
47d7036
allow to max out the workers
sanderegg Jan 9, 2024
ad6c735
added worker retierement
sanderegg Jan 9, 2024
c7bcabb
use debugpy
sanderegg Jan 9, 2024
d1e4803
use debugpy
sanderegg Jan 9, 2024
7536eba
more robust
sanderegg Jan 9, 2024
fd8be0e
ensure stuff goes in the right location
sanderegg Jan 9, 2024
7e9d183
fix regex
sanderegg Jan 11, 2024
dbe2b2d
helper function
sanderegg Jan 11, 2024
c3e1d9b
improve logs
sanderegg Jan 11, 2024
3102058
fix dask fun issues
sanderegg Jan 11, 2024
c8acad8
refactor
sanderegg Jan 11, 2024
83f2e4e
typo
sanderegg Jan 11, 2024
5e64c0d
manual tester
sanderegg Jan 11, 2024
997aaab
mypy
sanderegg Jan 11, 2024
17f9720
docs
sanderegg Jan 11, 2024
6d59735
use Resources model
sanderegg Jan 12, 2024
92953cd
use resources, and make computing much faster by keeping available re…
sanderegg Jan 12, 2024
5df6fc2
type
sanderegg Jan 12, 2024
71867f4
simplify
sanderegg Jan 12, 2024
b82c7f1
refactor
sanderegg Jan 12, 2024
726d07c
typo
sanderegg Jan 12, 2024
06867f0
added delay for removing node
sanderegg Jan 12, 2024
15c904a
ruff
sanderegg Jan 13, 2024
e003438
missing parameter
sanderegg Jan 15, 2024
89415bd
keep task assignment in cluster
sanderegg Jan 15, 2024
9c33fa7
preparing to add assigned tasks in cluster
sanderegg Jan 15, 2024
c09b5e7
fix too many instances in
sanderegg Jan 15, 2024
4f2e729
allow to use replace
sanderegg Jan 15, 2024
939636c
refactor
sanderegg Jan 15, 2024
7341384
analyze with used resources
sanderegg Jan 15, 2024
5dc9183
in memory tasks only take memory
sanderegg Jan 15, 2024
edaf0cc
refactor
sanderegg Jan 15, 2024
ab66063
refactoring notifications
sanderegg Jan 15, 2024
69b6e8c
cleanup
sanderegg Jan 15, 2024
8aaa247
refactor
sanderegg Jan 15, 2024
e0794c5
fix clusters-keeper
sanderegg Jan 15, 2024
f839aed
refactoring
sanderegg Jan 15, 2024
71ba6b0
minor
sanderegg Jan 15, 2024
8ba7457
progress and logs now better
sanderegg Jan 15, 2024
ec899dc
improved notification mechanism
sanderegg Jan 16, 2024
c4b897b
reduce log pollution
sanderegg Jan 16, 2024
84aa9e5
syntax
sanderegg Jan 16, 2024
dd047a6
syntax
sanderegg Jan 16, 2024
80e9f09
fixed tests
sanderegg Jan 16, 2024
523d3c2
fixed comparison
sanderegg Jan 16, 2024
4dcb929
fixed confusing errors while running
sanderegg Jan 16, 2024
e46890b
only upgrade available resources if not set in constructor
sanderegg Jan 16, 2024
dc29dd7
tests are fixed here
sanderegg Jan 16, 2024
81e36cd
fixed tests
sanderegg Jan 16, 2024
61bafbe
fixed tests
sanderegg Jan 16, 2024
eb14d4b
mypy
sanderegg Jan 16, 2024
ebf85f6
removed unused code
sanderegg Jan 16, 2024
0e18cf1
remove catastrophic backtracking
sanderegg Jan 16, 2024
0441def
remove catastrophic backtracking
sanderegg Jan 16, 2024
cd99cd1
also check pending nodes
sanderegg Jan 16, 2024
a73f7b7
necessary for docker >24
sanderegg Jan 16, 2024
4b6f787
remove too many logs
sanderegg Jan 16, 2024
78e2869
only deactivate active nodes
sanderegg Jan 16, 2024
3c8d3ed
fixed regex
sanderegg Jan 16, 2024
e98b0e6
fixed test
sanderegg Jan 16, 2024
d6ef835
clean up
sanderegg Jan 16, 2024
d6d7b18
@pcrespov review: rename fct
sanderegg Jan 17, 2024
019d0bb
@pcrespov review: improve code
sanderegg Jan 17, 2024
c703f6f
@pcrespov review: refactor
sanderegg Jan 17, 2024
d23e109
@pcrespov review: remove unused fct
sanderegg Jan 17, 2024
b337597
@pcrespov review: add asserts to convey message
sanderegg Jan 17, 2024
40a4e25
@pcrespov review: refactor
sanderegg Jan 17, 2024
6787420
some typos
sanderegg Jan 17, 2024
a178756
mypy
sanderegg Jan 17, 2024
a1dc532
adapt to new naming
sanderegg Jan 17, 2024
5eb5c36
fix calls to dask_scheduler stuff
sanderegg Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions packages/aws-library/src/aws_library/ec2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ async def get_ec2_instance_capabilities(
list_instances.append(
EC2InstanceType(
name=instance["InstanceType"],
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024
resources=Resources(
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"])
* 1024
* 1024
),
),
)
)
Expand Down Expand Up @@ -173,9 +177,7 @@ async def start_aws_instance(
tags=parse_obj_as(
EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]}
),
resources=Resources(
cpus=instance_config.type.cpus, ram=instance_config.type.ram
),
resources=instance_config.type.resources,
)
for instance in instances["Reservations"][0]["Instances"]
]
Expand Down Expand Up @@ -234,10 +236,7 @@ async def get_instances(
else None,
type=instance["InstanceType"],
state=instance["State"]["Name"],
resources=Resources(
cpus=ec2_instance_types[0].cpus,
ram=ec2_instance_types[0].ram,
),
resources=ec2_instance_types[0].resources,
tags=parse_obj_as(
EC2Tags,
{tag["Key"]: tag["Value"] for tag in instance["Tags"]},
Expand Down
13 changes: 8 additions & 5 deletions packages/aws-library/src/aws_library/ec2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
Extra,
Field,
NonNegativeFloat,
PositiveInt,
validator,
)
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType


class Resources(BaseModel):
class Resources(BaseModel, frozen=True):
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
cpus: NonNegativeFloat
ram: ByteSize

Expand Down Expand Up @@ -53,12 +52,16 @@ def __sub__(self, other: "Resources") -> "Resources":
}
)

@validator("cpus", pre=True)
@classmethod
def _floor_cpus_to_0(cls, v: float) -> float:
return max(v, 0)

@dataclass(frozen=True)

@dataclass(frozen=True, kw_only=True, slots=True)
class EC2InstanceType:
name: InstanceTypeType
cpus: PositiveInt
ram: ByteSize
resources: Resources


InstancePrivateDNSName: TypeAlias = str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass

from pydantic import ByteSize, PositiveInt
from pydantic import ByteSize, NonNegativeFloat


@dataclass(frozen=True)
class EC2InstanceTypeGet:
name: str
cpus: PositiveInt
cpus: NonNegativeFloat
ram: ByteSize
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def dask_spec_local_cluster(
scheduler_address = URL(cluster.scheduler_address)
monkeypatch.setenv(
"COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL",
f"{scheduler_address}" or "invalid",
f"{scheduler_address or 'invalid'}",
)
yield cluster

Expand All @@ -95,7 +95,7 @@ async def dask_local_cluster_without_workers(
scheduler_address = URL(cluster.scheduler_address)
monkeypatch.setenv(
"COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL",
f"{scheduler_address}" or "invalid",
f"{scheduler_address or 'invalid'}",
)
yield cluster

Expand Down
4 changes: 2 additions & 2 deletions services/autoscaling/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ if [ "${SC_BUILD_TARGET}" = "development" ]; then
fi

if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
# NOTE: production does NOT pre-installs ptvsd
pip install --no-cache-dir ptvsd
# NOTE: production does NOT pre-installs debugpy
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
pip install --no-cache-dir debugpy
fi

# Appends docker group if socket is mounted
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from fastapi import FastAPI
from models_library.basic_types import BootModeEnum
from servicelib.fastapi.prometheus_instrumentation import (
setup_prometheus_instrumentation,
)
Expand All @@ -21,6 +22,7 @@
from ..modules.ec2 import setup as setup_ec2
from ..modules.rabbitmq import setup as setup_rabbitmq
from ..modules.redis import setup as setup_redis
from ..modules.remote_debug import setup_remote_debugging
from .settings import ApplicationSettings

logger = logging.getLogger(__name__)
Expand All @@ -46,6 +48,8 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
setup_prometheus_instrumentation(app)

# PLUGINS SETUP
if settings.SC_BOOT_MODE == BootModeEnum.DEBUG:
setup_remote_debugging(app)
setup_api_routes(app)
setup_docker(app)
setup_rabbitmq(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BootModeEnum,
BuildTargetEnum,
LogLevel,
PortInt,
VersionTag,
)
from models_library.docker import DockerLabelKey
Expand Down Expand Up @@ -182,6 +183,7 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
AUTOSCALING_DEBUG: bool = Field(
default=False, description="Debug mode", env=["AUTOSCALING_DEBUG", "DEBUG"]
)
AUTOSCALING_REMOTE_DEBUG_PORT: PortInt = PortInt(3000)

AUTOSCALING_LOGLEVEL: LogLevel = Field(
LogLevel.INFO, env=["AUTOSCALING_LOGLEVEL", "LOG_LEVEL", "LOGLEVEL"]
Expand Down
74 changes: 58 additions & 16 deletions services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,68 @@
from models_library.generated_models.docker_rest_api import Node


@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstance:
instance: EC2InstanceData
available_resources: Resources
assigned_tasks: list
@dataclass(frozen=True, slots=True, kw_only=True)
class _TaskAssignmentMixin:
assigned_tasks: list = field(default_factory=list)
available_resources: Resources = field(default_factory=Resources.create_as_empty)

def assign_task(self, task, task_resources: Resources) -> None:
self.assigned_tasks.append(task)
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
object.__setattr__(
self, "available_resources", self.available_resources - task_resources
)

@dataclass(frozen=True, kw_only=True)
class AssignedTasksToInstanceType:
def has_resources_for_task(self, task_resources: Resources) -> bool:
return bool(self.available_resources >= task_resources)


@dataclass(frozen=True, kw_only=True, slots=True)
class AssignedTasksToInstanceType(_TaskAssignmentMixin):
instance_type: EC2InstanceType
assigned_tasks: list


@dataclass(frozen=True)
class AssociatedInstance:
node: Node
@dataclass(frozen=True, kw_only=True, slots=True)
class _BaseInstance(_TaskAssignmentMixin):
ec2_instance: EC2InstanceData

def __post_init__(self) -> None:
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
if self.available_resources == Resources.create_as_empty():
object.__setattr__(self, "available_resources", self.ec2_instance.resources)


@dataclass(frozen=True, kw_only=True, slots=True)
class AssociatedInstance(_BaseInstance):
node: Node


@dataclass(frozen=True, kw_only=True, slots=True)
class NonAssociatedInstance(_BaseInstance):
...

@dataclass(frozen=True)

@dataclass(frozen=True, kw_only=True, slots=True)
class Cluster:
active_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is active (with running tasks)"
"description": "This is a EC2-backed docker node which is active and ready to receive tasks (or with running tasks)"
}
)
pending_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which is active and NOT yet ready to receive tasks"
}
)
drained_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is drained (with no tasks)"
"description": "This is a EC2-backed docker node which is drained (cannot accept tasks)"
}
)
reserve_drained_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2 backed docker node which is drained in the reserve if this is enabled (with no tasks)"
"description": "This is a EC2-backed docker node which is drained in the reserve if this is enabled (with no tasks)"
}
)
pending_ec2s: list[EC2InstanceData] = field(
pending_ec2s: list[NonAssociatedInstance] = field(
metadata={
"description": "This is an EC2 instance that is not yet associated to a docker node"
}
Expand All @@ -54,6 +79,23 @@ class Cluster:
)
terminated_instances: list[EC2InstanceData]

def can_scale_down(self) -> bool:
return bool(
self.active_nodes
or self.pending_nodes
or self.drained_nodes
or self.pending_ec2s
)

def total_number_of_machines(self) -> int:
return (
len(self.active_nodes)
+ len(self.pending_nodes)
+ len(self.drained_nodes)
+ len(self.reserve_drained_nodes)
+ len(self.pending_ec2s)
)


DaskTaskId: TypeAlias = str

Expand Down
Loading
Loading