Skip to content

Commit

Permalink
Merge branch 'master' into sort-studies-templates-services
Browse files Browse the repository at this point in the history
  • Loading branch information
jsaq007 authored Mar 7, 2024
2 parents f44370b + 45ae0eb commit 651e9f7
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions scripts/maintenance/computational-clusters/osparc_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_SSH_USER_NAME: Final[str] = "ubuntu"


@dataclass(frozen=True, slots=True, kw_only=True)
@dataclass(slots=True, kw_only=True)
class AutoscaledInstance:
name: str
ec2_instance: Instance
Expand All @@ -42,7 +42,7 @@ class InstanceRole(str, Enum):
worker = "worker"


@dataclass(frozen=True, slots=True, kw_only=True)
@dataclass(slots=True, kw_only=True)
class ComputationalInstance(AutoscaledInstance):
role: InstanceRole
user_id: int
Expand All @@ -62,7 +62,7 @@ class DynamicService:
containers: list[str]


@dataclass(frozen=True, slots=True, kw_only=True)
@dataclass(slots=True, kw_only=True)
class DynamicInstance(AutoscaledInstance):
running_services: list[DynamicService]

Expand Down Expand Up @@ -402,7 +402,6 @@ def _print_computational_clusters(
) -> None:
time_now = arrow.utcnow()
table = Table(
Column(""),
Column("Instance", justify="left", overflow="fold"),
Column("Links", justify="left", overflow="fold"),
Column("Computational details"),
Expand All @@ -416,9 +415,9 @@ def _print_computational_clusters(
):
# first print primary machine info
table.add_row(
f"[bold]{_color_encode_with_state('Primary', cluster.primary.ec2_instance)}",
"\n".join(
[
f"[bold]{_color_encode_with_state('Primary', cluster.primary.ec2_instance)}",
f"Name: {cluster.primary.name}",
f"ID: {cluster.primary.ec2_instance.id}",
f"AMI: {cluster.primary.ec2_instance.image_id}({cluster.primary.ec2_instance.image.name})",
Expand Down Expand Up @@ -448,19 +447,20 @@ def _print_computational_clusters(
)

# now add the workers
for worker in cluster.workers:
for index, worker in enumerate(cluster.workers):
table.add_row(
f"[italic]{_color_encode_with_state('Worker', worker.ec2_instance)}[/italic]",
"\n".join(
[
f"[italic]{_color_encode_with_state(f'Worker {index+1}', worker.ec2_instance)}[/italic]",
f"Name: {worker.name}",
f"ID: {worker.ec2_instance.id}",
f"AMI: {worker.ec2_instance.image_id}({worker.ec2_instance.image.name})",
f"Type: {worker.ec2_instance.instance_type}",
f"Up: {_timedelta_formatting(time_now - worker.ec2_instance.launch_time, color_code=True)}",
f"ExtIP: {worker.ec2_instance.public_ip_address}",
f"IntIP: {worker.ec2_instance.private_ip_address}",
f"Name: {worker.name}",
f"/mnt/docker(free): {_color_encode_with_threshold(worker.disk_space.human_readable(), worker.disk_space, TypeAdapter(ByteSize).validate_python('15Gib'))}",
"",
]
),
"\n".join(
Expand Down Expand Up @@ -526,8 +526,6 @@ def _dask_list_tasks(dask_client: distributed.Client) -> dict[TaskState, list[Ta
def _list_tasks(
dask_scheduler: distributed.Scheduler,
) -> dict[TaskId, TaskState]:
from collections import defaultdict

task_state_to_tasks = defaultdict(list)
for task in dask_scheduler.tasks.values():
task_state_to_tasks[task.state].append(task.key)
Expand Down Expand Up @@ -556,16 +554,34 @@ def _dask_client(ip_address: str) -> distributed.Client:

def _analyze_computational_instances(
computational_instances: list[ComputationalInstance],
ssh_key_path: Path,
ssh_key_path: Path | None,
) -> list[ComputationalCluster]:

all_disk_spaces = [UNDEFINED_BYTESIZE] * len(computational_instances)
if ssh_key_path is not None:
all_disk_spaces = asyncio.get_event_loop().run_until_complete(
asyncio.gather(
*(
asyncio.get_event_loop().run_in_executor(
None,
_ssh_and_get_available_disk_space,
instance.ec2_instance,
_SSH_USER_NAME,
ssh_key_path,
)
for instance in computational_instances
),
return_exceptions=True,
)
)

computational_clusters = []
for instance in track(
computational_instances, description="Collecting computational clusters data..."
for instance, disk_space in track(
zip(computational_instances, all_disk_spaces, strict=True),
description="Collecting computational clusters data...",
):
docker_disk_space = _ssh_and_get_available_disk_space(
instance.ec2_instance, _SSH_USER_NAME, ssh_key_path
)
upgraded_instance = replace(instance, disk_space=docker_disk_space)
if isinstance(disk_space, ByteSize):
instance.disk_space = disk_space
if instance.role is InstanceRole.manager:
scheduler_info = {}
datasets_on_cluster = ()
Expand All @@ -581,9 +597,10 @@ def _analyze_computational_instances(

assert isinstance(datasets_on_cluster, tuple)
assert isinstance(processing_jobs, dict)

computational_clusters.append(
ComputationalCluster(
primary=upgraded_instance,
primary=instance,
workers=[],
scheduler_info=scheduler_info,
datasets=datasets_on_cluster,
Expand All @@ -600,7 +617,7 @@ def _analyze_computational_instances(
cluster.primary.user_id == instance.user_id
and cluster.primary.wallet_id == instance.wallet_id
):
cluster.workers.append(upgraded_instance)
cluster.workers.append(instance)

return computational_clusters

Expand Down

0 comments on commit 651e9f7

Please sign in to comment.