Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTML repr to scheduler_info and incorporate into client and cluster reprs #4857

Merged
merged 17 commits into from
Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 78 additions & 45 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
)
from .diagnostics.plugin import UploadFile, WorkerPlugin, _get_worker_plugin_name
from .metrics import time
from .objects import HasWhat, WhoHas
from .objects import HasWhat, SchedulerInfo, WhoHas
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .publish import Datasets
Expand Down Expand Up @@ -828,7 +828,10 @@ def dashboard_link(self):
return self.cluster.dashboard_link
except AttributeError:
scheduler, info = self._get_scheduler_info()
protocol, rest = scheduler.address.split("://")
if scheduler is None:
return None
else:
protocol, rest = scheduler.address.split("://")

port = info["services"]["dashboard"]
if protocol == "inproc":
Expand Down Expand Up @@ -875,7 +878,7 @@ def _get_scheduler_info(self):
info = self._scheduler_identity
scheduler = self.scheduler

return scheduler, info
return scheduler, SchedulerInfo(info)

def __repr__(self):
# Note: avoid doing I/O here...
Expand Down Expand Up @@ -903,53 +906,83 @@ def __repr__(self):
self.scheduler.address,
)
else:
return "<%s: not connected>" % (self.__class__.__name__,)
return "<%s: No scheduler connected>" % (self.__class__.__name__,)

def _repr_html_(self):
scheduler, info = self._get_scheduler_info()

text = (
'<h3 style="text-align: left;">Client</h3>\n'
'<ul style="text-align: left; list-style: none; margin: 0; padding: 0;">\n'
)
if scheduler is not None:
text += " <li><b>Scheduler: </b>%s</li>\n" % scheduler.address
if scheduler is None:
child_repr = """<p>No scheduler connected.</p>"""
elif self.cluster:
child_repr = f"""
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Cluster Info</h3></summary>
{self.cluster._repr_html_()}
</details>
"""
else:
text += " <li><b>Scheduler: not connected</b></li>\n"

if info and "dashboard" in info["services"]:
text += (
" <li><b>Dashboard: </b><a href='%(web)s' target='_blank'>%(web)s</a></li>\n"
% {"web": self.dashboard_link}
)
child_repr = f"""
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Scheduler Info</h3></summary>
{info._repr_html_()}
</details>
"""

client_status = ""

if not self.cluster and not self.scheduler_file:
client_status += """
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Direct</td>
<td style="text-align: left;"></td>
</tr>
"""

text += "</ul>\n"

if info:
workers = list(info["workers"].values())
cores = sum(w["nthreads"] for w in workers)
memory = [w["memory_limit"] for w in workers]
memory = format_bytes(sum(memory)) if all(memory) else ""

text2 = (
'<h3 style="text-align: left;">Cluster</h3>\n'
'<ul style="text-align: left; list-style:none; margin: 0; padding: 0;">\n'
" <li><b>Workers: </b>%d</li>\n"
" <li><b>Cores: </b>%d</li>\n"
" <li><b>Memory: </b>%s</li>\n"
"</ul>\n"
) % (len(workers), cores, memory)

return (
'<table style="border: 2px solid white;">\n'
"<tr>\n"
'<td style="vertical-align: top; border: 0px solid white">\n%s</td>\n'
'<td style="vertical-align: top; border: 0px solid white">\n%s</td>\n'
"</tr>\n</table>"
) % (text, text2)

else:
return text
if self.cluster:
client_status += f"""
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Cluster object</td>
<td style="text-align: left;"><strong>Cluster type:</strong> {type(self.cluster).__name__}</td>
</tr>
"""
elif self.scheduler_file:
client_status += f"""
<tr>
<td style="text-align: left;"><strong>Connection method:</strong> Scheduler file</td>
<td style="text-align: left;"><strong>Scheduler file:</strong> {self.scheduler_file}</td>
</tr>
"""

if self.dashboard_link:
client_status += f"""
<tr>
<td style="text-align: left;">
<strong>Dashboard: </strong>
<a href="{self.dashboard_link}">{self.dashboard_link}</a>
</td>
<td style="text-align: left;"></td>
</tr>
"""

return f"""
<div>
<div style="
width: 24px;
height: 24px;
background-color: #e1e1e1;
border: 3px solid #9D9D9D;
border-radius: 5px;
position: absolute;"> </div>
<div style="margin-left: 48px;">
<h3 style="margin-bottom: 0px;">Client</h3>
<p style="color: #9D9D9D; margin-bottom: 0px;">{self.id}</p>
<table style="width: 100%; text-align: left;">
{client_status}
</table>
{child_repr}
</div>
</div>
"""

def start(self, **kwargs):
"""Start scheduler running in separate thread"""
Expand Down Expand Up @@ -1163,7 +1196,7 @@ async def _update_scheduler_info(self):
if self.status not in ("running", "connecting"):
return
try:
self._scheduler_identity = await self.scheduler.identity()
self._scheduler_identity = SchedulerInfo(await self.scheduler.identity())
except EnvironmentError:
logger.debug("Not able to query scheduler for identity")

Expand Down
156 changes: 88 additions & 68 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dask.utils import format_bytes

from ..core import Status
from ..objects import SchedulerInfo
from ..utils import (
Log,
Logs,
Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self, asynchronous, quiet=False, name=None):
self._cluster_manager_logs = []
self.quiet = quiet
self.scheduler_comm = None
self._adaptive = None

if name is not None:
self.name = name
Expand All @@ -72,7 +74,7 @@ def __init__(self, asynchronous, quiet=False, name=None):
async def _start(self):
comm = await self.scheduler_comm.live_comm()
await comm.write({"op": "subscribe_worker_status"})
self.scheduler_info = await comm.read()
self.scheduler_info = SchedulerInfo(await comm.read())
self._watch_worker_status_comm = comm
self._watch_worker_status_task = asyncio.ensure_future(
self._watch_worker_status(comm)
Expand Down Expand Up @@ -263,7 +265,11 @@ def dashboard_link(self):
host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0]
return format_dashboard_link(host, port)

def _widget_status(self):
def _scaling_status(self):
if self._adaptive and self._adaptive.periodic_callback:
mode = "Adaptive"
else:
mode = "Manual"
workers = len(self.scheduler_info["workers"])
if hasattr(self, "worker_spec"):
requested = sum(
Expand All @@ -274,36 +280,14 @@ def _widget_status(self):
requested = len(self.workers)
else:
requested = workers
cores = sum(v["nthreads"] for v in self.scheduler_info["workers"].values())
memory = sum(v["memory_limit"] for v in self.scheduler_info["workers"].values())
memory = format_bytes(memory)
text = """
<div>
<style scoped>
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}

.dataframe tbody tr th {
vertical-align: top;
}

.dataframe thead th {
text-align: right;
}
</style>
<table style="text-align: right;">
<tr> <th>Workers</th> <td>%s</td></tr>
<tr> <th>Cores</th> <td>%d</td></tr>
<tr> <th>Memory</th> <td>%s</td></tr>
</table>
</div>
""" % (
workers if workers == requested else "%d / %d" % (workers, requested),
cores,
memory,
)
return text

worker_count = workers if workers == requested else f"{workers} / {requested}"
return f"""
<table>
<tr><td style="text-align: left;">Scaling mode: {mode}</td></tr>
<tr><td style="text-align: left;">Workers: {worker_count}</td></tr>
</table>
"""

def _widget(self):
"""Create IPython widget for display within a notebook"""
Expand All @@ -313,26 +297,23 @@ def _widget(self):
pass

try:
from ipywidgets import HTML, Accordion, Button, HBox, IntText, Layout, VBox
from ipywidgets import (
HTML,
Accordion,
Button,
HBox,
IntText,
Layout,
Tab,
VBox,
)
except ImportError:
self._cached_widget = None
return None

layout = Layout(width="150px")

if self.dashboard_link:
link = '<p><b>Dashboard: </b><a href="%s" target="_blank">%s</a></p>\n' % (
self.dashboard_link,
self.dashboard_link,
)
else:
link = ""

title = "<h2>%s</h2>" % self._cluster_class_name
title = HTML(title)
dashboard = HTML(link)

status = HTML(self._widget_status(), layout=Layout(min_width="150px"))
status = HTML(self._repr_html_())

if self._supports_scaling:
request = IntText(0, description="Workers", layout=layout)
Expand Down Expand Up @@ -368,12 +349,18 @@ def scale_cb(b):
else:
accordion = HTML("")

box = VBox([title, HBox([status, accordion]), dashboard])
scale_status = HTML(self._scaling_status())

self._cached_widget = box
tab = Tab()
tab.children = [status, VBox([scale_status, accordion])]
tab.set_title(0, "Status")
tab.set_title(1, "Scaling")

self._cached_widget = tab

def update():
status.value = self._widget_status()
status.value = self._repr_html_()
scale_status.value = self._scaling_status()

cluster_repr_interval = parse_timedelta(
dask.config.get("distributed.deploy.cluster-repr-interval", default="ms")
Expand All @@ -382,25 +369,58 @@ def update():
self.periodic_callbacks["cluster-repr"] = pc
pc.start()

return box

def _repr_html_(self):
if self.dashboard_link:
dashboard = "<a href='{0}' target='_blank'>{0}</a>".format(
self.dashboard_link
)
else:
dashboard = "Not Available"
return (
"<div style='color: var(--jp-ui-font-color0, #000000); "
"background-color: var(--jp-layout-color2, #f2f2f2); display: inline-block; "
"padding: 10px; border: 1px solid var(--jp-border-color0, #999999);'>\n"
" <h3>{cls}</h3>\n"
" <ul>\n"
" <li><b>Dashboard: </b>{dashboard}\n"
" </ul>\n"
"</div>\n"
).format(cls=self._cluster_class_name, dashboard=dashboard)
return tab

def _repr_html_(self, cluster_status=None):

if not cluster_status:
cluster_status = ""

cluster_status += f"""
<tr>
<td style="text-align: left;">
<strong>Dashboard:</strong> <a href="{self.dashboard_link}">{self.dashboard_link}</a>
</td>
<td style="text-align: left;"><strong>Workers:</strong> {len(self.scheduler_info["workers"])}</td>
</tr>
<tr>
<td style="text-align: left;">
<strong>Total threads:</strong>
{sum([w["nthreads"] for w in self.scheduler_info["workers"].values()])}
</td>
<td style="text-align: left;">
<strong>Total memory:</strong>
{format_bytes(sum([w["memory_limit"] for w in self.scheduler_info["workers"].values()]))}
</td>
</tr>
"""
try:
scheduler_info_repr = self.scheduler_info._repr_html_()
except AttributeError:
scheduler_info_repr = "Scheduler not started yet."

return f"""
<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-output">
<div style="
width: 24px;
height: 24px;
background-color: #e1e1e1;
border: 3px solid #9D9D9D;
border-radius: 5px;
position: absolute;"> </div>
<div style="margin-left: 48px;">
<h3 style="margin-bottom: 0px; margin-top: 0px;">{type(self).__name__}</h3>
<p style="color: #9D9D9D; margin-bottom: 0px;">{self.name}</p>
<table style="width: 100%; text-align: left;">
{cluster_status}
</table>
<details>
<summary style="margin-bottom: 20px;"><h3 style="display: inline;">Scheduler Info</h3></summary>
{scheduler_info_repr}
</details>
</div>
</div>
"""

def _ipython_display_(self, **kwargs):
widget = self._widget()
Expand Down
Loading