From 91e4df77a0b38f0c22541384e4fbc7987a24c406 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 26 May 2021 15:36:47 +0100 Subject: [PATCH 01/13] Add HTML reprs for Client.who_has and Client.has_what --- distributed/client.py | 5 ++-- distributed/objects.py | 67 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 distributed/objects.py diff --git a/distributed/client.py b/distributed/client.py index 919d9181f2..245e53d0f9 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -53,6 +53,7 @@ ) from .diagnostics.plugin import UploadFile, WorkerPlugin, _get_worker_plugin_name from .metrics import time +from .objects import HasWhat, WhoHas from .protocol import to_serialize from .protocol.pickle import dumps, loads from .publish import Datasets @@ -3205,7 +3206,7 @@ def who_has(self, futures=None, **kwargs): keys = list(map(stringify, {f.key for f in futures})) else: keys = None - return self.sync(self.scheduler.who_has, keys=keys, **kwargs) + return WhoHas(self.sync(self.scheduler.who_has, keys=keys, **kwargs)) def has_what(self, workers=None, **kwargs): """Which keys are held by which workers @@ -3239,7 +3240,7 @@ def has_what(self, workers=None, **kwargs): workers = list(workers) if workers is not None and not isinstance(workers, (tuple, list, set)): workers = [workers] - return self.sync(self.scheduler.has_what, workers=workers, **kwargs) + return HasWhat(self.sync(self.scheduler.has_what, workers=workers, **kwargs)) def processing(self, workers=None): """The tasks currently running on each worker diff --git a/distributed/objects.py b/distributed/objects.py new file mode 100644 index 0000000000..d53860514e --- /dev/null +++ b/distributed/objects.py @@ -0,0 +1,67 @@ +"""This file contains custom objects. +These are mostly regular objects with more useful _repr_ and _repr_html_ methods.""" + + +class HasWhat(dict): + """A dictionary of all workers and which keys that worker has.""" + + def _repr_html_(self): + rows = "" + + for worker, keys in sorted(self.items()): + summary = "" + for key in keys: + summary += f"""{key}""" + + rows += f""" + {worker} + {len(keys)} + +
+ Expand + + {summary} +
+
+ + """ + + output = f""" + + + + + + + {rows} +
WorkerKey countKey list
+ """ + + return output + + +class WhoHas(dict): + """A dictionary of all keys and which workers have that key.""" + + def _repr_html_(self): + rows = "" + + for title, keys in sorted(self.items()): + rows += f""" + {title} + {len(keys)} + {", ".join(keys)} + """ + + output = f""" + + + + + + + {rows} +
KeyCopiesWorkers
+ """ + + return output From 7e0dde157b8632a4120e0074660e112086e35fa7 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 27 May 2021 09:52:18 +0100 Subject: [PATCH 02/13] Add scheduler info html repr --- distributed/client.py | 4 +- distributed/deploy/cluster.py | 3 +- distributed/objects.py | 117 ++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 245e53d0f9..3084bd7048 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -53,7 +53,7 @@ ) from .diagnostics.plugin import UploadFile, WorkerPlugin, _get_worker_plugin_name from .metrics import time -from .objects import HasWhat, WhoHas +from .objects import HasWhat, SchedulerInfo, WhoHas from .protocol import to_serialize from .protocol.pickle import dumps, loads from .publish import Datasets @@ -3449,7 +3449,7 @@ def scheduler_info(self, **kwargs): """ if not self.asynchronous: self.sync(self._update_scheduler_info) - return self._scheduler_identity + return SchedulerInfo(self._scheduler_identity) def write_scheduler_file(self, scheduler_file): """Write the scheduler information to a json file. diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 00360c7aa2..a88709af71 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -12,6 +12,7 @@ from dask.utils import format_bytes from ..core import Status +from ..objects import SchedulerInfo from ..utils import ( Log, Logs, @@ -71,7 +72,7 @@ def __init__(self, asynchronous, quiet=False, name=None): async def _start(self): comm = await self.scheduler_comm.live_comm() await comm.write({"op": "subscribe_worker_status"}) - self.scheduler_info = await comm.read() + self.scheduler_info = SchedulerInfo(await comm.read()) self._watch_worker_status_comm = comm self._watch_worker_status_task = asyncio.ensure_future( self._watch_worker_status(comm) diff --git a/distributed/objects.py b/distributed/objects.py index d53860514e..689d163cf1 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -1,5 +1,10 @@ """This file contains custom objects. These are mostly regular objects with more useful _repr_ and _repr_html_ methods.""" +import datetime + +from dask.utils import format_bytes, format_time_ago + +from .utils import clean_dashboard_address class HasWhat(dict): @@ -65,3 +70,115 @@ def _repr_html_(self): """ return output + + +class SchedulerInfo(dict): + """A dictionary of information about the scheduler and workers.""" + + def _repr_html_(self): + dashboard_address = None + if "dashboard" in self["services"]: + dashboard_address = self["address"].split(":") + dashboard_address[0] = "http" + dashboard_address[-1] = str(self["services"]["dashboard"]) + "/status" + dashboard_address = ":".join(dashboard_address) + + scheduler = f""" +
+
 
+
+

{self["type"]}

+

{self["id"]}

+ + + + + + + + + + + + + +
Comm: {self["address"]}Workers: {len(self["workers"])}
Dashboard: {dashboard_address}Total threads: {sum([w["nthreads"] for w in self["workers"].values()])}
Started: {format_time_ago(datetime.datetime.fromtimestamp(self["started"]))}Total memory: {format_bytes(sum([w["memory_limit"] for w in self["workers"].values()]))}
+
+
+ """ + + workers = "" + for worker_name in self["workers"]: + self["workers"][worker_name]["comm"] = worker_name + for worker in sorted(self["workers"].values(), key=lambda k: k["name"]): + dashboard_address = None + if "dashboard" in worker["services"]: + dashboard_address = worker["comm"].split(":") + dashboard_address[0] = "http" + dashboard_address[-1] = str(worker["services"]["dashboard"]) + dashboard_address = ":".join(dashboard_address) + + metrics = "" + + if "metrics" in worker: + metrics = f""" + + Tasks executing: {worker["metrics"]["executing"]} + Tasks in memory: {worker["metrics"]["in_memory"]} + + + Tasks ready: {worker["metrics"]["ready"]} + Tasks in flight: {worker["metrics"]["in_flight"]} + + + CPU usage: {worker["metrics"]["cpu"]}% + Last seen: {format_time_ago(datetime.datetime.fromtimestamp(worker["last_seen"]))} + + + Memory usage: {((worker["metrics"]["memory"] / worker["memory_limit"]) * 100):.1f}% + Spilled bytes: {format_bytes(worker["metrics"]["spilled_nbytes"])} + + + Read bytes: {format_bytes(worker["metrics"]["read_bytes"])} + Write bytes: {format_bytes(worker["metrics"]["write_bytes"])} + + """ + + workers += f""" +
+
 
+
+
+

{worker["type"]}: {worker["name"]}

+ + + + + + + + + + + + + + + + + {metrics} +
Comm: {worker["comm"]}Total threads: {worker["nthreads"]}
Dashboard: {dashboard_address}Memory: {format_bytes(worker["memory_limit"])}
Nanny: {worker["nanny"]}
Local directory: {worker["local_directory"]}
+
+
+
+ """ + + return f""" +
+ {scheduler} +
+

Workers

+ {workers} +
+
+ """ From 5fc90849f58820321412572252bf2bdb9beedf94 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 27 May 2021 11:11:34 +0100 Subject: [PATCH 03/13] use scheduler info repr in client and cluster reprs --- distributed/client.py | 91 ++++++++++++++------------ distributed/deploy/cluster.py | 119 ++++++++++++---------------------- distributed/deploy/local.py | 11 +++- distributed/objects.py | 102 ++++++++++++++++++++++------- 4 files changed, 178 insertions(+), 145 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 3084bd7048..ada38a903d 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -875,7 +875,7 @@ def _get_scheduler_info(self): info = self._scheduler_identity scheduler = self.scheduler - return scheduler, info + return scheduler, SchedulerInfo(info) def __repr__(self): # Note: avoid doing I/O here... @@ -908,51 +908,56 @@ def __repr__(self): def _repr_html_(self): scheduler, info = self._get_scheduler_info() - text = ( - '

Client

\n' - '
    \n' - ) - if scheduler is not None: - text += "
  • Scheduler: %s
  • \n" % scheduler.address + if scheduler is None: + scheduler_repr = """

    No scheduler connected.

    """ else: - text += "
  • Scheduler: not connected
  • \n" + scheduler_repr = info._repr_html_() - if info and "dashboard" in info["services"]: - text += ( - "
  • Dashboard: %(web)s
  • \n" - % {"web": self.dashboard_link} - ) + client_status = "" - text += "
\n" + if not self.cluster and not self.scheduler_file: + client_status += """ + + Connection method: Direct + + + """ - if info: - workers = list(info["workers"].values()) - cores = sum(w["nthreads"] for w in workers) - if all(isinstance(w["memory_limit"], Number) for w in workers): - memory = sum(w["memory_limit"] for w in workers) - memory = format_bytes(memory) - else: - memory = "" - - text2 = ( - '

Cluster

\n' - '
    \n' - "
  • Workers: %d
  • \n" - "
  • Cores: %d
  • \n" - "
  • Memory: %s
  • \n" - "
\n" - ) % (len(workers), cores, memory) - - return ( - '\n' - "\n" - '\n' - '\n' - "\n
\n%s\n%s
" - ) % (text, text2) + if self.cluster: + client_status += f""" + + Connection method: Cluster object + Cluster type: {type(self.cluster).__name__} + + """ - else: - return text + if self.scheduler_file: + client_status += f""" + + Connection method: Scheduler file + Scheduler file: {self.scheduler_file} + + """ + + return f""" +
+
 
+
+

Client

+

{self.id}

+ + {client_status} +
+ {scheduler_repr} +
+
+ """ def start(self, **kwargs): """Start scheduler running in separate thread""" @@ -1166,7 +1171,7 @@ async def _update_scheduler_info(self): if self.status not in ("running", "connecting"): return try: - self._scheduler_identity = await self.scheduler.identity() + self._scheduler_identity = SchedulerInfo(await self.scheduler.identity()) except EnvironmentError: logger.debug("Not able to query scheduler for identity") @@ -3449,7 +3454,7 @@ def scheduler_info(self, **kwargs): """ if not self.asynchronous: self.sync(self._update_scheduler_info) - return SchedulerInfo(self._scheduler_identity) + return self._scheduler_identity def write_scheduler_file(self, scheduler_file): """Write the scheduler information to a json file. diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index a88709af71..b8787bf632 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -260,48 +260,6 @@ def dashboard_link(self): host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] return format_dashboard_link(host, port) - def _widget_status(self): - workers = len(self.scheduler_info["workers"]) - if hasattr(self, "worker_spec"): - requested = sum( - 1 if "group" not in each else len(each["group"]) - for each in self.worker_spec.values() - ) - elif hasattr(self, "workers"): - requested = len(self.workers) - else: - requested = workers - cores = sum(v["nthreads"] for v in self.scheduler_info["workers"].values()) - memory = sum(v["memory_limit"] for v in self.scheduler_info["workers"].values()) - memory = format_bytes(memory) - text = """ -
- - - - - -
Workers %s
Cores %d
Memory %s
-
-""" % ( - workers if workers == requested else "%d / %d" % (workers, requested), - cores, - memory, - ) - return text - def _widget(self): """Create IPython widget for display within a notebook""" try: @@ -310,26 +268,14 @@ def _widget(self): pass try: - from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, VBox + from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, Tab except ImportError: self._cached_widget = None return None layout = Layout(width="150px") - if self.dashboard_link: - link = '

Dashboard: %s

\n' % ( - self.dashboard_link, - self.dashboard_link, - ) - else: - link = "" - - title = "

%s

" % self._cluster_class_name - title = HTML(title) - dashboard = HTML(link) - - status = HTML(self._widget_status(), layout=Layout(min_width="150px")) + status = HTML(self._repr_html_()) if self._supports_scaling: request = IntText(0, description="Workers", layout=layout) @@ -365,12 +311,15 @@ def scale_cb(b): else: accordion = HTML("") - box = VBox([title, HBox([status, accordion]), dashboard]) + tab = Tab() + tab.children = [status, accordion] + tab.set_title(0, "Status") + tab.set_title(1, "Scaling") - self._cached_widget = box + self._cached_widget = tab def update(): - status.value = self._widget_status() + status.value = self._repr_html_() cluster_repr_interval = parse_timedelta( dask.config.get("distributed.deploy.cluster-repr-interval", default="ms") @@ -379,25 +328,41 @@ def update(): self.periodic_callbacks["cluster-repr"] = pc pc.start() - return box + return tab - def _repr_html_(self): - if self.dashboard_link: - dashboard = "{0}".format( - self.dashboard_link - ) - else: - dashboard = "Not Available" - return ( - "
\n" - "

{cls}

\n" - "
    \n" - "
  • Dashboard: {dashboard}\n" - "
\n" - "
\n" - ).format(cls=self._cluster_class_name, dashboard=dashboard) + def _repr_html_(self, cluster_status=None): + + if not cluster_status: + cluster_status = "" + + cluster_status += f""" + + + Dashboard link: {self.dashboard_link} + + + + """ + + return f""" + + """ def _ipython_display_(self, **kwargs): widget = self._widget() diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index e1f18734b0..f68661fdea 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -118,7 +118,7 @@ def __init__( interface=None, worker_class=None, scheduler_kwargs=None, - **worker_kwargs + **worker_kwargs, ): if ip is not None: # In the future we should warn users about this move @@ -245,6 +245,15 @@ def start_worker(self, *args, **kwargs): "Please see the `cluster.scale` method instead." ) + def _repr_html_(self): + cluster_status = f""" + + Status: {self.status} + Using processes: {self.processes} + + """ + return super()._repr_html_(cluster_status=cluster_status) + clusters_to_close = weakref.WeakSet() diff --git a/distributed/objects.py b/distributed/objects.py index 689d163cf1..62b9505725 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -4,8 +4,6 @@ from dask.utils import format_bytes, format_time_ago -from .utils import clean_dashboard_address - class HasWhat(dict): """A dictionary of all workers and which keys that worker has.""" @@ -85,7 +83,13 @@ def _repr_html_(self): scheduler = f"""
-
 
+
 

{self["type"]}

{self["id"]}

@@ -95,12 +99,23 @@ def _repr_html_(self): Workers: {len(self["workers"])} - Dashboard: {dashboard_address} - Total threads: {sum([w["nthreads"] for w in self["workers"].values()])} + + Dashboard: {dashboard_address} + + + Total threads: + {sum([w["nthreads"] for w in self["workers"].values()])} + - Started: {format_time_ago(datetime.datetime.fromtimestamp(self["started"]))} - Total memory: {format_bytes(sum([w["memory_limit"] for w in self["workers"].values()]))} + + Started: + {format_time_ago(datetime.datetime.fromtimestamp(self["started"]))} + + + Total memory: + {format_bytes(sum([w["memory_limit"] for w in self["workers"].values()]))} +
@@ -123,48 +138,87 @@ def _repr_html_(self): if "metrics" in worker: metrics = f""" - Tasks executing: {worker["metrics"]["executing"]} - Tasks in memory: {worker["metrics"]["in_memory"]} + + Tasks executing: {worker["metrics"]["executing"]} + + + Tasks in memory: {worker["metrics"]["in_memory"]} + - Tasks ready: {worker["metrics"]["ready"]} - Tasks in flight: {worker["metrics"]["in_flight"]} + + Tasks ready: {worker["metrics"]["ready"]} + + + Tasks in flight: {worker["metrics"]["in_flight"]} + CPU usage: {worker["metrics"]["cpu"]}% - Last seen: {format_time_ago(datetime.datetime.fromtimestamp(worker["last_seen"]))} + + Last seen: + {format_time_ago(datetime.datetime.fromtimestamp(worker["last_seen"]))} + - Memory usage: {((worker["metrics"]["memory"] / worker["memory_limit"]) * 100):.1f}% - Spilled bytes: {format_bytes(worker["metrics"]["spilled_nbytes"])} + + Memory usage: + {((worker["metrics"]["memory"] / worker["memory_limit"]) * 100):.1f}% + + + Spilled bytes: + {format_bytes(worker["metrics"]["spilled_nbytes"])} + - Read bytes: {format_bytes(worker["metrics"]["read_bytes"])} - Write bytes: {format_bytes(worker["metrics"]["write_bytes"])} + + Read bytes: + {format_bytes(worker["metrics"]["read_bytes"])} + + + Write bytes: + {format_bytes(worker["metrics"]["write_bytes"])} + """ workers += f"""
-
 
+
 
-

{worker["type"]}: {worker["name"]}

+ +

{worker["type"]}: {worker["name"]}

+
- - + + - - + + - + - + {metrics}
Comm: {worker["comm"]}Total threads: {worker["nthreads"]}Comm: {worker["comm"]}Total threads: {worker["nthreads"]}
Dashboard: {dashboard_address}Memory: {format_bytes(worker["memory_limit"])} + Dashboard: + {dashboard_address} + + Memory: + {format_bytes(worker["memory_limit"])} +
Nanny: {worker["nanny"]}Nanny: {worker["nanny"]}
Local directory: {worker["local_directory"]} + Local directory: + {worker["local_directory"]} +
From 897621366531eb7e2be06213575c68743eef1f47 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 27 May 2021 11:45:27 +0100 Subject: [PATCH 04/13] Fix status name --- distributed/deploy/local.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index c2dbe30152..616e304ad0 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -251,7 +251,7 @@ def start_worker(self, *args, **kwargs): def _repr_html_(self): cluster_status = f""" - Status: {self.status} + Status: {self.status.name} Using processes: {self.processes} """ From 0f39cf4a95d0216346287eff0c002138ecc6eed9 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 27 May 2021 14:54:44 +0100 Subject: [PATCH 05/13] A little refactoring and fix tests --- distributed/client.py | 30 ++++++++-- distributed/deploy/cluster.py | 60 +++++++++++++++++-- distributed/deploy/tests/test_spec_cluster.py | 10 +--- distributed/objects.py | 26 ++++---- distributed/tests/test_client.py | 2 +- 5 files changed, 100 insertions(+), 28 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 77b2fd1dcd..e3551acf7f 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -909,9 +909,21 @@ def _repr_html_(self): scheduler, info = self._get_scheduler_info() if scheduler is None: - scheduler_repr = """

No scheduler connected.

""" + child_repr = """

No scheduler connected.

""" + elif self.cluster: + child_repr = f""" +
+

Cluster Info

+ {self.cluster._repr_html_()} +
+ """ else: - scheduler_repr = info._repr_html_() + child_repr = f""" +
+

Scheduler Info

+ {info._repr_html_()} +
+ """ client_status = "" @@ -939,6 +951,16 @@ def _repr_html_(self): """ + client_status += f""" + + + Dashboard: + {self.dashboard_link} + + + + """ + return f"""
 
+ position: absolute;">

Client

{self.id}

{client_status}
- {scheduler_repr} + {child_repr}
""" diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index b8787bf632..6183d6b337 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -62,6 +62,7 @@ def __init__(self, asynchronous, quiet=False, name=None): self._cluster_manager_logs = [] self.quiet = quiet self.scheduler_comm = None + self._adaptive = None if name is not None: self.name = name @@ -260,6 +261,30 @@ def dashboard_link(self): host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] return format_dashboard_link(host, port) + def _scaling_status(self): + if self._adaptive and self._adaptive.periodic_callback: + mode = "Adaptive" + else: + mode = "Manual" + workers = len(self.scheduler_info["workers"]) + if hasattr(self, "worker_spec"): + requested = sum( + 1 if "group" not in each else len(each["group"]) + for each in self.worker_spec.values() + ) + elif hasattr(self, "workers"): + requested = len(self.workers) + else: + requested = workers + + worker_count = workers if workers == requested else f"{workers} / {requested}" + return f""" + + + +
Scaling mode: {mode}
Workers: {worker_count}
+ """ + def _widget(self): """Create IPython widget for display within a notebook""" try: @@ -268,7 +293,16 @@ def _widget(self): pass try: - from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, Tab + from ipywidgets import ( + HTML, + Accordion, + Button, + HBox, + IntText, + Layout, + Tab, + VBox, + ) except ImportError: self._cached_widget = None return None @@ -311,8 +345,10 @@ def scale_cb(b): else: accordion = HTML("") + scale_status = HTML(self._scaling_status()) + tab = Tab() - tab.children = [status, accordion] + tab.children = [status, VBox([scale_status, accordion])] tab.set_title(0, "Status") tab.set_title(1, "Scaling") @@ -320,6 +356,7 @@ def scale_cb(b): def update(): status.value = self._repr_html_() + scale_status.value = self._scaling_status() cluster_repr_interval = parse_timedelta( dask.config.get("distributed.deploy.cluster-repr-interval", default="ms") @@ -338,9 +375,19 @@ def _repr_html_(self, cluster_status=None): cluster_status += f""" - Dashboard link: {self.dashboard_link} + Dashboard: {self.dashboard_link} + + Workers: {len(self.scheduler_info["workers"])} + + + + Total threads: + {sum([w["nthreads"] for w in self.scheduler_info["workers"].values()])} + + + Total memory: + {format_bytes(sum([w["memory_limit"] for w in self.scheduler_info["workers"].values()]))} - """ @@ -352,14 +399,17 @@ def _repr_html_(self, cluster_status=None): background-color: #e1e1e1; border: 3px solid #9D9D9D; border-radius: 5px; - position: absolute;"> 
+ position: absolute;">

{type(self).__name__}

{self.name}

{cluster_status}
+
+

Scheduler Info

{self.scheduler_info._repr_html_()} +
""" diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index c1798fd580..9b8448b93f 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -363,12 +363,8 @@ async def test_widget(cleanup): await asyncio.sleep(0.01) assert time() < start + 1 - text = cluster._widget_status() - assert "3" in text - assert "GB" in text or "GiB" in text - cluster.scale(5) - assert "3 / 5" in cluster._widget_status() + assert "3 / 5" in cluster._scaling_status() @pytest.mark.asyncio @@ -449,8 +445,8 @@ async def test_MultiWorker(cleanup): while "workers=4" not in repr(cluster): await asyncio.sleep(0.1) - workers_line = re.search("(Workers.+)", cluster._widget_status()).group(1) - assert re.match("Workers.*4", workers_line) + workers_line = re.search("(Workers.+)", cluster._repr_html_()).group(1) + assert re.match("Workers.*4", workers_line) cluster.scale(1) await cluster diff --git a/distributed/objects.py b/distributed/objects.py index 62b9505725..1f1f791dca 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -4,6 +4,8 @@ from dask.utils import format_bytes, format_time_ago +from distributed.utils import format_dashboard_link + class HasWhat(dict): """A dictionary of all workers and which keys that worker has.""" @@ -76,10 +78,11 @@ class SchedulerInfo(dict): def _repr_html_(self): dashboard_address = None if "dashboard" in self["services"]: - dashboard_address = self["address"].split(":") - dashboard_address[0] = "http" - dashboard_address[-1] = str(self["services"]["dashboard"]) + "/status" - dashboard_address = ":".join(dashboard_address) + _, addr = self["address"].split("://") + host, _ = addr.split(":") + dashboard_address = format_dashboard_link( + host, self["services"]["dashboard"] + ) scheduler = f"""
@@ -89,7 +92,7 @@ def _repr_html_(self): background-color: #FFF7E5; border: 3px solid #FF6132; border-radius: 5px; - position: absolute;"> 
+ position: absolute;">

{self["type"]}

{self["id"]}

@@ -128,10 +131,11 @@ def _repr_html_(self): for worker in sorted(self["workers"].values(), key=lambda k: k["name"]): dashboard_address = None if "dashboard" in worker["services"]: - dashboard_address = worker["comm"].split(":") - dashboard_address[0] = "http" - dashboard_address[-1] = str(worker["services"]["dashboard"]) - dashboard_address = ":".join(dashboard_address) + _, addr = worker["comm"].split("://") + host, _ = addr.split(":") + dashboard_address = format_dashboard_link( + host, worker["services"]["dashboard"] + ) metrics = "" @@ -163,7 +167,7 @@ def _repr_html_(self): Memory usage: - {((worker["metrics"]["memory"] / worker["memory_limit"]) * 100):.1f}% + {format_bytes(worker["metrics"]["memory"])} Spilled bytes: @@ -189,7 +193,7 @@ def _repr_html_(self): background-color: #DBF5FF; border: 3px solid #4CC9FF; border-radius: 5px; - position: absolute;"> 
+ position: absolute;">
diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 6fbbae9570..b764d9e61e 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1995,7 +1995,7 @@ def test_repr(loop): for func in funcs: text = func(c) assert c.scheduler.address in text - assert "threads=3" in text or "Cores: 3" in text + assert "threads=3" in text or "Total threads: 3" in text assert "6.00 GB" in text or "5.59 GiB" in text if " Date: Wed, 2 Jun 2021 15:59:54 +0100 Subject: [PATCH 06/13] handle async cases --- distributed/client.py | 24 ++++++++++++++---------- distributed/deploy/cluster.py | 6 +++++- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 13e4f29e09..56bf9f9668 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -828,7 +828,10 @@ def dashboard_link(self): return self.cluster.dashboard_link except AttributeError: scheduler, info = self._get_scheduler_info() - protocol, rest = scheduler.address.split("://") + try: + protocol, rest = scheduler.address.split("://") + except AttributeError: + return None port = info["services"]["dashboard"] if protocol == "inproc": @@ -951,15 +954,16 @@ def _repr_html_(self): """ - client_status += f""" - - - Dashboard: - {self.dashboard_link} - - - - """ + if self.dashboard_link: + client_status += f""" + + + Dashboard: + {self.dashboard_link} + + + + """ return f"""
diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 0cce0fbcc6..8807cdec99 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -394,6 +394,10 @@ def _repr_html_(self, cluster_status=None): """ + try: + scheduler_info_repr = self.scheduler_info._repr_html_() + except AttributeError: + scheduler_info_repr = "Scheduler not started yet." return f"""
From ff5a184454f041bb09a78eb3740fe530100dbbf9 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 2 Jun 2021 17:23:18 +0100 Subject: [PATCH 07/13] Add GPU info to workers --- distributed/objects.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/distributed/objects.py b/distributed/objects.py index 1f1f791dca..7f87ebdfad 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -186,6 +186,21 @@ def _repr_html_(self): """ + gpu = "" + + if "gpu" in worker: + gpu = f""" + + + GPU: {worker["gpu"]["name"]} + + + GPU memory: + {format_bytes(worker["gpu"]["memory-total"])} + + + """ + workers += f"""
+ {gpu} {metrics}
From 2fce582cff397282733aef8b6caecbd47c1b06dd Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 2 Jun 2021 17:28:41 +0100 Subject: [PATCH 08/13] Allow cluster_status from subclasses --- distributed/deploy/local.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/deploy/local.py b/distributed/deploy/local.py index 616e304ad0..e759d8b121 100644 --- a/distributed/deploy/local.py +++ b/distributed/deploy/local.py @@ -248,8 +248,10 @@ def start_worker(self, *args, **kwargs): "Please see the `cluster.scale` method instead." ) - def _repr_html_(self): - cluster_status = f""" + def _repr_html_(self, cluster_status=None): + if cluster_status is None: + cluster_status = "" + cluster_status += f""" Status: {self.status.name} Using processes: {self.processes} From c67403ac40eb7c510bf1a411499154474d8a732d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Mon, 14 Jun 2021 16:55:44 +0100 Subject: [PATCH 09/13] Fix tests --- distributed/objects.py | 7 +++---- distributed/tests/test_client.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/distributed/objects.py b/distributed/objects.py index 7f87ebdfad..547595670d 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -1,6 +1,7 @@ """This file contains custom objects. These are mostly regular objects with more useful _repr_ and _repr_html_ methods.""" import datetime +from urllib.parse import urlparse from dask.utils import format_bytes, format_time_ago @@ -78,8 +79,7 @@ class SchedulerInfo(dict): def _repr_html_(self): dashboard_address = None if "dashboard" in self["services"]: - _, addr = self["address"].split("://") - host, _ = addr.split(":") + host = urlparse(self["address"]).hostname dashboard_address = format_dashboard_link( host, self["services"]["dashboard"] ) @@ -131,8 +131,7 @@ def _repr_html_(self): for worker in sorted(self["workers"].values(), key=lambda k: k["name"]): dashboard_address = None if "dashboard" in worker["services"]: - _, addr = worker["comm"].split("://") - host, _ = addr.split(":") + host = urlparse(worker["comm"]).hostname dashboard_address = format_dashboard_link( host, worker["services"]["dashboard"] ) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ba48594e52..5e724b7bcb 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1999,7 +1999,7 @@ def test_repr(loop): for func in funcs: text = func(c) assert c.scheduler.address in text - assert "threads=3" in text or "Total threads: 3" in text + assert "threads=3" in text or "Total threads: " in text assert "6.00 GB" in text or "5.59 GiB" in text if " Date: Mon, 14 Jun 2021 16:57:19 +0100 Subject: [PATCH 10/13] Make worker name title smaller --- distributed/objects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/objects.py b/distributed/objects.py index 547595670d..521e19f831 100644 --- a/distributed/objects.py +++ b/distributed/objects.py @@ -211,7 +211,7 @@ def _repr_html_(self):
-

{worker["type"]}: {worker["name"]}

+

{worker["type"]}: {worker["name"]}

From 0b50054e6936dc49a1a42463acc31499a15fb907 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 14 Jun 2021 14:16:09 -0500 Subject: [PATCH 11/13] Fix distributed/tests/test_client.py::test_repr --- distributed/client.py | 2 +- distributed/tests/test_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 00686d9ae3..98d56c926f 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -906,7 +906,7 @@ def __repr__(self): self.scheduler.address, ) else: - return "<%s: not connected>" % (self.__class__.__name__,) + return "<%s: No scheduler connected>" % (self.__class__.__name__,) def _repr_html_(self): scheduler, info = self._get_scheduler_info() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 5e724b7bcb..ec47479e15 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -2006,7 +2006,7 @@ def test_repr(loop): for func in funcs: text = func(c) - assert "not connected" in text + assert "No scheduler connected" in text @gen_cluster(client=True) From c3c7e60a4e1aa37f417021ff45977f842b3f9768 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 15 Jun 2021 09:33:28 +0100 Subject: [PATCH 12/13] Update distributed/client.py Co-authored-by: James Bourbeau --- distributed/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 98d56c926f..d46f7d4bb9 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -828,10 +828,10 @@ def dashboard_link(self): return self.cluster.dashboard_link except AttributeError: scheduler, info = self._get_scheduler_info() - try: - protocol, rest = scheduler.address.split("://") - except AttributeError: + if scheduler is None: return None + else: + protocol, rest = scheduler.address.split("://") port = info["services"]["dashboard"] if protocol == "inproc": From 32f082a29e88caa26f02c86e183a21045c165754 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 15 Jun 2021 09:35:14 +0100 Subject: [PATCH 13/13] Handle cluster and scheduler file both being set --- distributed/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index d46f7d4bb9..edee278289 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -945,8 +945,7 @@ def _repr_html_(self): """ - - if self.scheduler_file: + elif self.scheduler_file: client_status += f"""
Cluster type: {type(self.cluster).__name__}
Connection method: Scheduler file