From 6949c6d19b21d56ca70dec802d3b5f1899ba7b34 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 25 Mar 2022 12:53:36 -0500 Subject: [PATCH 01/16] Relax variable test_race (#5993) The intent of this test is mostly to make sure that lots of workers can access the same variable in parallel without breaking things. Part of this test also ensures that none of them got too far behind the others. This is very hard to actually guarantee though. There was some slippage built into the test, but a particularly long wait in one of the workers can set the entire thing back arbitrarily far. We remove the extra part of this test. --- distributed/tests/test_variable.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 405ad06f2ee..03243873d30 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -213,7 +213,6 @@ def f(i): futures = c.map(f, range(15)) results = await c.gather(futures) - assert all(r > NITERS * 0.8 for r in results) while len(s.wants_what["variable-x"]) != 1: await asyncio.sleep(0.01) From 2fbf9ebef4ea862b72bd221f63d2785016d0086c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 25 Mar 2022 13:38:03 -0500 Subject: [PATCH 02/16] Track Event Loop intervals in dashboard plot (#5964) --- distributed/core.py | 23 ++++++-- distributed/dashboard/components/scheduler.py | 53 +++++++++++++++++++ distributed/dashboard/scheduler.py | 2 + .../dashboard/tests/test_scheduler_bokeh.py | 9 +++- distributed/distributed-schema.yaml | 5 +- distributed/distributed.yaml | 1 + distributed/tests/test_worker.py | 19 +++++++ distributed/worker.py | 1 + 8 files changed, 108 insertions(+), 5 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 6d043c62b1d..419205ef4a0 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -234,11 +234,20 @@ def stop(): self.periodic_callbacks["monitor"] = pc self._last_tick = time() - measure_tick_interval = parse_timedelta( + self._tick_counter = 0 + self._tick_count = 0 + self._tick_count_last = time() + self._tick_interval = parse_timedelta( dask.config.get("distributed.admin.tick.interval"), default="ms" ) - pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000) - self.periodic_callbacks["tick"] = pc + self._tick_interval_observed = self._tick_interval + self.periodic_callbacks["tick"] = PeriodicCallback( + self._measure_tick, self._tick_interval * 1000 + ) + self.periodic_callbacks["ticks"] = PeriodicCallback( + self._cycle_ticks, + parse_timedelta(dask.config.get("distributed.admin.tick.cycle")) * 1000, + ) self.thread_id = 0 @@ -351,6 +360,7 @@ def _measure_tick(self): now = time() diff = now - self._last_tick self._last_tick = now + self._tick_counter += 1 if diff > tick_maximum_delay: logger.info( "Event loop was unresponsive in %s for %.2fs. " @@ -363,6 +373,13 @@ def _measure_tick(self): if self.digests is not None: self.digests["tick-duration"].add(diff) + def _cycle_ticks(self): + if not self._tick_counter: + return + last, self._tick_count_last = self._tick_count_last, time() + count, self._tick_counter = self._tick_counter, 0 + self._tick_interval_observed = (time() - last) / (count or 1) + @property def address(self): """ diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index bab3cb2b1e5..7fa639c799d 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -3018,6 +3018,59 @@ def update(self): ) +class EventLoop(DashboardComponent): + """Event Loop Health""" + + def __init__(self, scheduler, **kwargs): + with log_errors(): + self.scheduler = scheduler + self.source = ColumnDataSource( + { + "names": ["Scheduler", "Workers"], + "values": [0, 0], + "text": ["0", "0"], + } + ) + + self.root = figure( + title="Event Loop Health", + x_range=["Scheduler", "Workers"], + y_range=[ + 0, + parse_timedelta(dask.config.get("distributed.admin.tick.interval")) + * 25, + ], + tools="", + toolbar_location="above", + **kwargs, + ) + self.root.vbar(x="names", top="values", width=0.9, source=self.source) + + self.root.xaxis.minor_tick_line_alpha = 0 + self.root.ygrid.visible = True + self.root.xgrid.visible = False + + hover = HoverTool(tooltips=[("Interval", "@text s")], mode="vline") + self.root.add_tools(hover) + + @without_property_validation + def update(self): + with log_errors(): + s = self.scheduler + + data = { + "names": ["Scheduler", "Workers"], + "values": [ + s._tick_interval_observed, + sum([w.metrics["event_loop_interval"] for w in s.workers.values()]) + / (len(s.workers) or 1), + ], + } + data["text"] = [format_time(x) for x in data["values"]] + + update(self.source, data) + + class WorkerTable(DashboardComponent): """Status of the current workers diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 3d8e62d95ff..42c50b732bc 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -16,6 +16,7 @@ ClusterMemory, ComputePerKey, CurrentLoad, + EventLoop, MemoryByKey, Occupancy, SystemMonitor, @@ -97,6 +98,7 @@ "/individual-compute-time-per-key": individual_doc(ComputePerKey, 500), "/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500), "/individual-scheduler-system": individual_doc(SystemMonitor, 500), + "/individual-event-loop": individual_doc(EventLoop, 500), "/individual-profile": individual_profile_doc, "/individual-profile-server": individual_profile_server_doc, "/individual-gpu-memory": gpu_memory_doc, diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 5242aa1eb80..566fe9377cf 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -23,6 +23,7 @@ ClusterMemory, ComputePerKey, CurrentLoad, + EventLoop, Events, MemoryByKey, Occupancy, @@ -75,7 +76,13 @@ async def test_simple(c, s, a, b): @gen_cluster(client=True, worker_kwargs={"dashboard": True}) async def test_basic(c, s, a, b): - for component in [TaskStream, SystemMonitor, Occupancy, StealingTimeSeries]: + for component in [ + TaskStream, + SystemMonitor, + Occupancy, + StealingTimeSeries, + EventLoop, + ]: ss = component(s) ss.update() diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 2c6c0d5b566..6187fb342f9 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -510,7 +510,7 @@ properties: - {type: number, minimum: 0} - enum: [false] description: >- - Limit of number of bytes to be spilled on disk. + Limit of number of bytes to be spilled on disk. monitor-interval: type: string @@ -976,6 +976,9 @@ properties: limit : type: string description: The time allowed before triggering a warning + cycle : + type: string + description: The time in between verifying event loop speed max-error-length: type: integer diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 27642579409..61b6522c11e 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -270,6 +270,7 @@ distributed: tick: interval: 20ms # time between event loop health checks limit: 3s # time allowed before triggering a warning + cycle: 1s # time between checking event loop speed max-error-length: 10000 # Maximum size traceback after error to return log-length: 10000 # default length of logs to keep in memory diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0caa128c02b..b168592a14a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3306,3 +3306,22 @@ async def test_Worker__to_dict(c, s, a): } assert d["tasks"]["x"]["key"] == "x" assert d["data"] == ["x"] + + +@gen_cluster( + client=True, + config={ + "distributed.admin.tick.interval": "5ms", + "distributed.admin.tick.cycle": "100ms", + }, +) +async def test_tick_interval(c, s, a, b): + import time + + await a.heartbeat() + x = s.workers[a.address].metrics["event_loop_interval"] + while s.workers[a.address].metrics["event_loop_interval"] > 0.050: + await asyncio.sleep(0.01) + while s.workers[a.address].metrics["event_loop_interval"] < 0.100: + await asyncio.sleep(0.01) + time.sleep(0.200) diff --git a/distributed/worker.py b/distributed/worker.py index 13e5adeff00..338e824f3c4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -946,6 +946,7 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, + event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent()) From 06170d564bbd89b0eec3a046fec012927250bf75 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 28 Mar 2022 04:49:14 -0500 Subject: [PATCH 03/16] Add a hardware benchmark to test memory, disk, and network bandwidths (#5966) --- distributed/client.py | 13 ++ distributed/compatibility.py | 16 ++ distributed/dashboard/components/scheduler.py | 165 +++++++++++++++++- distributed/dashboard/scheduler.py | 20 ++- .../dashboard/tests/test_scheduler_bokeh.py | 6 + distributed/http/templates/base.html | 6 +- distributed/scheduler.py | 57 ++++++ distributed/tests/test_client.py | 14 ++ distributed/tests/test_worker.py | 21 ++- distributed/worker.py | 117 +++++++++++++ docs/source/http_services.rst | 7 +- 11 files changed, 431 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index a8b4cb57989..49afd3792e3 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4041,6 +4041,19 @@ def get_worker_logs(self, n=None, workers=None, nanny=False): """ return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny) + def benchmark_hardware(self) -> dict: + """ + Run a benchmark on the workers for memory, disk, and network bandwidths + + Returns + ------- + result: dict + A dictionary mapping the names "disk", "memory", and "network" to + dictionaries mapping sizes to bandwidths. These bandwidths are + averaged over many workers running computations across the cluster. + """ + return self.sync(self.scheduler.benchmark_hardware) + def log_event(self, topic, msg): """Log an event under a given topic diff --git a/distributed/compatibility.py b/distributed/compatibility.py index 32c94151d55..f0151267867 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -37,3 +37,19 @@ async def to_thread(func, /, *args, **kwargs): ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) return await loop.run_in_executor(None, func_call) + + +if sys.version_info >= (3, 9): + from random import randbytes +else: + try: + import numpy + + def randbytes(size): + return numpy.random.randint(255, size=size, dtype="u8").tobytes() + + except ImportError: + import secrets + + def randbytes(size): + return secrets.token_bytes(size) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 7fa639c799d..a19e2838f77 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -21,6 +21,7 @@ ColumnDataSource, CustomJSHover, DataRange1d, + FactorRange, GroupFilter, HoverTool, NumberFormatter, @@ -49,7 +50,14 @@ import dask from dask import config -from dask.utils import format_bytes, format_time, funcname, key_split, parse_timedelta +from dask.utils import ( + format_bytes, + format_time, + funcname, + key_split, + parse_bytes, + parse_timedelta, +) from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( @@ -559,6 +567,148 @@ def update(self): update(self.source, d) +class Hardware(DashboardComponent): + """Occupancy (in time) per worker""" + + def __init__(self, scheduler, **kwargs): + with log_errors(): + self.scheduler = scheduler + # Disk + self.disk_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.disk_figure = figure( + title="Disk Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + self.disk_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.disk_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.disk_figure.add_tools(hover) + self.disk_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") + self.disk_figure.xgrid.visible = False + + # Memory + self.memory_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.memory_figure = figure( + title="Memory Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + + self.memory_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.memory_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.memory_figure.add_tools(hover) + self.memory_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") + self.memory_figure.xgrid.visible = False + + # Network + self.network_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.network_figure = figure( + title="Network Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + + self.network_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.network_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.network_figure.add_tools(hover) + self.network_figure.yaxis[0].formatter = NumeralTickFormatter( + format="0.0 b" + ) + self.network_figure.xgrid.visible = False + + self.root = row( + self.memory_figure, + self.disk_figure, + self.network_figure, + ) + + self.memory_data = { + "size": [], + "bandwidth": [], + } + self.disk_data = { + "size": [], + "bandwidth": [], + } + self.network_data = { + "size": [], + "bandwidth": [], + } + + async def f(): + result = await self.scheduler.benchmark_hardware() + + for size in sorted(result["disk"], key=parse_bytes): + bandwidth = result["disk"][size] + self.disk_data["size"].append(size) + self.disk_data["bandwidth"].append(bandwidth) + + for size in sorted(result["memory"], key=parse_bytes): + bandwidth = result["memory"][size] + self.memory_data["size"].append(size) + self.memory_data["bandwidth"].append(bandwidth) + + for size in sorted(result["network"], key=parse_bytes): + bandwidth = result["network"][size] + self.network_data["size"].append(size) + self.network_data["bandwidth"].append(bandwidth) + + self.scheduler.loop.add_callback(f) + + def update(self): + if ( + not self.disk_data["size"] + or self.disk_figure.title.text == "Disk Bandwidth" + ): + return + + self.network_figure.x_range.factors = self.network_data["size"] + self.disk_figure.x_range.factors = self.disk_data["size"] + self.memory_figure.x_range.factors = self.memory_data["size"] + update(self.disk_source, self.disk_data) + update(self.memory_source, self.memory_data) + update(self.network_source, self.network_data) + self.memory_figure.title.text = "Memory Bandwidth" + self.disk_figure.title.text = "Disk Bandwidth" + self.network_figure.title.text = "Network Bandwidth" + + class BandwidthTypes(DashboardComponent): """Bar chart showing bandwidth per type""" @@ -3432,6 +3582,19 @@ def workers_doc(scheduler, extra, doc): doc.theme = BOKEH_THEME +def hardware_doc(scheduler, extra, doc): + with log_errors(): + hw = Hardware(scheduler) + hw.update() + doc.title = "Dask: Cluster Hardware Bandwidth" + doc.add_root(hw.root) + doc.template = env.get_template("simple.html") + doc.template_variables.update(extra) + doc.theme = BOKEH_THEME + + add_periodic_callback(doc, hw, 500) + + def tasks_doc(scheduler, extra, doc): with log_errors(): ts = TaskStream( diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 42c50b732bc..a7dc0a05331 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -31,6 +31,7 @@ WorkerTable, events_doc, graph_doc, + hardware_doc, individual_doc, individual_profile_doc, individual_profile_server_doc, @@ -57,6 +58,7 @@ "/profile": profile_doc, "/profile-server": profile_server_doc, "/graph": graph_doc, + "/hardware": hardware_doc, "/groups": tg_graph_doc, "/gpu": gpu_doc, "/individual-task-stream": individual_doc( @@ -106,7 +108,7 @@ } -template_variables = { +template_variables: dict = { "pages": [ "status", "workers", @@ -117,8 +119,22 @@ "groups", "info", ], - "plots": [x.replace("/", "") for x in applications if "individual" in x], + "plots": [ + { + "url": x.strip("/"), + "name": " ".join(x.strip("/").split("-")[1:]) + .title() + .replace("Cpu", "CPU") + .replace("Gpu", "GPU"), + } + for x in applications + if "individual" in x + ] + + [{"url": "hardware", "name": "Hardware"}], } +template_variables["plots"] = sorted( + template_variables["plots"], key=lambda d: d["name"] +) if NVML_ENABLED: template_variables["pages"].insert(4, "gpu") diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 566fe9377cf..415b566e3a0 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -25,6 +25,7 @@ CurrentLoad, EventLoop, Events, + Hardware, MemoryByKey, Occupancy, ProcessingHistogram, @@ -1004,3 +1005,8 @@ async def test_prefix_bokeh(s, a, b): bokeh_app = s.http_application.applications[0] assert isinstance(bokeh_app, BokehTornado) assert bokeh_app.prefix == f"/{prefix}" + + +@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) +async def test_hardware(c, s, a, b): + Hardware(s) # don't call update, takes too long for a test diff --git a/distributed/http/templates/base.html b/distributed/http/templates/base.html index 73e252f08e1..a38b20830d0 100644 --- a/distributed/http/templates/base.html +++ b/distributed/http/templates/base.html @@ -39,9 +39,7 @@ @@ -76,4 +74,4 @@ - \ No newline at end of file + diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3484840af34..7a1c65a9a90 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -40,6 +40,7 @@ merge, merge_sorted, merge_with, + partition, pluck, second, valmap, @@ -3977,6 +3978,7 @@ def __init__( "stop_task_metadata": self.stop_task_metadata, "get_cluster_state": self.get_cluster_state, "dump_cluster_state_to_url": self.dump_cluster_state_to_url, + "benchmark_hardware": self.benchmark_hardware, } connection_limit = get_fileno_limit() / 2 @@ -7327,6 +7329,61 @@ async def get_call_stack(self, keys=None): response = {w: r for w, r in zip(workers, results) if r} return response + async def benchmark_hardware(self) -> "dict[str, dict[str, float]]": + """ + Run a benchmark on the workers for memory, disk, and network bandwidths + + Returns + ------- + result: dict + A dictionary mapping the names "disk", "memory", and "network" to + dictionaries mapping sizes to bandwidths. These bandwidths are + averaged over many workers running computations across the cluster. + """ + out: "dict[str, defaultdict[str, list[float]]]" = { + name: defaultdict(list) for name in ["disk", "memory", "network"] + } + + # disk + result = await self.broadcast(msg={"op": "benchmark_disk"}) + for d in result.values(): + for size, duration in d.items(): + out["disk"][size].append(duration) + + # memory + result = await self.broadcast(msg={"op": "benchmark_memory"}) + for d in result.values(): + for size, duration in d.items(): + out["memory"][size].append(duration) + + # network + workers = list(self.workers) + # On an adaptive cluster, if multiple workers are started on the same physical host, + # they are more likely to connect to the Scheduler in sequence, ending up next to + # each other in this list. + # The transfer speed within such clusters of workers will be effectively that of + # localhost. This could happen across different VMs and/or docker images, so + # implementing logic based on IP addresses would not necessarily help. + # Randomize the connections to even out the mean measures. + random.shuffle(workers) + futures = [ + self.rpc(a).benchmark_network(address=b) for a, b in partition(2, workers) + ] + responses = await asyncio.gather(*futures) + + for d in responses: + for size, duration in d.items(): + out["network"][size].append(duration) + + result = {} + for mode in out: + result[mode] = { + size: sum(durations) / len(durations) + for size, durations in out[mode].items() + } + + return result + def get_nbytes(self, keys=None, summary=True): parent: SchedulerState = cast(SchedulerState, self) ts: TaskState diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f9558d2dd3f..7b419000327 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7481,6 +7481,20 @@ async def test_security_loader_import_failed(self): pass +@pytest.mark.avoid_ci(reason="This is slow and probably not worth the cost") +@pytest.mark.slow +@gen_cluster(client=True) +async def test_benchmark_hardware(c, s, a, b): + result = await c.benchmark_hardware() + assert set(result) == {"disk", "memory", "network"} + assert all(isinstance(v, float) for d in result.values() for v in d.values()) + + +@gen_cluster(client=True, nthreads=[]) +async def test_benchmark_hardware_no_workers(c, s): + assert await c.benchmark_hardware() == {"memory": {}, "disk": {}, "network": {}} + + @gen_cluster(client=True, nthreads=[]) async def test_wait_for_workers_updates_info(c, s): async with Worker(s.address): diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b168592a14a..317ccad635f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -57,7 +57,14 @@ slowinc, slowsum, ) -from distributed.worker import Worker, error_message, logger +from distributed.worker import ( + Worker, + benchmark_disk, + benchmark_memory, + benchmark_network, + error_message, + logger, +) pytestmark = pytest.mark.ci1 @@ -3308,6 +3315,18 @@ async def test_Worker__to_dict(c, s, a): assert d["data"] == ["x"] +@gen_cluster() +async def test_benchmark_hardware(s, a, b): + sizes = ["1 kiB", "10 kiB"] + disk = benchmark_disk(sizes=sizes, duration="1 ms") + memory = benchmark_memory(sizes=sizes, duration="1 ms") + network = await benchmark_network( + address=a.address, rpc=b.rpc, sizes=sizes, duration="1 ms" + ) + + assert set(disk) == set(memory) == set(network) == set(sizes) + + @gen_cluster( client=True, config={ diff --git a/distributed/worker.py b/distributed/worker.py index 338e824f3c4..7a6e040303a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -7,6 +7,7 @@ import heapq import logging import os +import pathlib import random import sys import threading @@ -41,6 +42,7 @@ parse_bytes, parse_timedelta, stringify, + tmpdir, typename, ) @@ -49,8 +51,10 @@ from distributed.comm import connect, get_address_host from distributed.comm.addressing import address_from_user_args, parse_address from distributed.comm.utils import OFFLOAD_THRESHOLD +from distributed.compatibility import randbytes from distributed.core import ( CommClosedError, + ConnectionPool, Status, coerce_to_address, error_message, @@ -738,6 +742,9 @@ def __init__( "plugin-add": self.plugin_add, "plugin-remove": self.plugin_remove, "get_monitor_info": self.get_monitor_info, + "benchmark_disk": self.benchmark_disk, + "benchmark_memory": self.benchmark_memory, + "benchmark_network": self.benchmark_network, } stream_handlers = { @@ -1197,6 +1204,7 @@ def func(data): with open(out_filename, "wb") as f: f.write(data) f.flush() + os.fsync(f.fileno()) return data if len(data) < 10000: @@ -3689,6 +3697,17 @@ def _notify_plugins(self, method_name, *args, **kwargs): "Plugin '%s' failed with exception", name, exc_info=True ) + async def benchmark_disk(self) -> dict[str, float]: + return await self.loop.run_in_executor( + self.executor, benchmark_disk, self.local_directory + ) + + async def benchmark_memory(self) -> dict[str, float]: + return await self.loop.run_in_executor(self.executor, benchmark_memory) + + async def benchmark_network(self, address: str) -> dict[str, float]: + return await benchmark_network(rpc=self.rpc, address=address) + ############## # Validation # ############## @@ -4568,3 +4587,101 @@ def warn(*args, **kwargs): worker.log_event("warn", {"args": args, "kwargs": kwargs}) warnings.warn(*args, **kwargs) + + +def benchmark_disk( + rootdir: str | None = None, + sizes: Iterable[str] = ("1 kiB", "100 kiB", "1 MiB", "10 MiB", "100 MiB"), + duration="1 s", +) -> dict[str, float]: + """ + Benchmark disk bandwidth + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + duration = parse_timedelta(duration) + + out = {} + for size_str in sizes: + with tmpdir(dir=rootdir) as dir: + dir = pathlib.Path(dir) + names = list(map(str, range(100))) + size = parse_bytes(size_str) + + data = randbytes(size) + + start = time() + total = 0 + while time() < start + duration: + with open(dir / random.choice(names), mode="ab") as f: + f.write(data) + f.flush() + os.fsync(f.fileno()) + total += size + + out[size_str] = total / (time() - start) + return out + + +def benchmark_memory( + sizes: Iterable[str] = ("2 kiB", "10 kiB", "100 kiB", "1 MiB", "10 MiB"), + duration="200 ms", +) -> dict[str, float]: + """ + Benchmark memory bandwidth + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + duration = parse_timedelta(duration) + out = {} + for size_str in sizes: + size = parse_bytes(size_str) + data = randbytes(size) + + start = time() + total = 0 + while time() < start + duration: + _ = data[:-1] + del _ + total += size + + out[size_str] = total / (time() - start) + return out + + +async def benchmark_network( + address: str, + rpc: ConnectionPool, + sizes: Iterable[str] = ("1 kiB", "10 kiB", "100 kiB", "1 MiB", "10 MiB", "50 MiB"), + duration="1 s", +) -> dict[str, float]: + """ + Benchmark network communications to another worker + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + + duration = parse_timedelta(duration) + out = {} + with rpc(address) as r: + for size_str in sizes: + size = parse_bytes(size_str) + data = to_serialize(randbytes(size)) + + start = time() + total = 0 + while time() < start + duration: + await r.echo(data=data) + total += size * 2 + + out[size_str] = total / (time() - start) + return out diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index 119733197f5..fe076a08653 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -21,6 +21,7 @@ others via a header navbar. - ``/graph``: currently processing graphs in a dependency tree view - ``/groups``: graph layout for task groups (dependencies, memory, output type, progress, tasks status) - ``/info``: redirect to ``/info/main/workers.html`` +- ``/hardware``: gathers bandwidth information on memory, disk, and network Scheduler HTTP -------------- @@ -81,11 +82,11 @@ Individual bokeh plots Worker HTTP ----------- -- ``/status``: -- ``/counters``: +- ``/status``: +- ``/counters``: - ``/crossfilter``: - ``/sitemap.json``: list of available endpoints -- ``/system``: +- ``/system``: - ``/health``: check server is alive - ``/metrics``: prometheus endpoint - ``/statics/()``: static file content (CSS, etc) From 119021a1197dd96792292439da65c935051fd5b8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 28 Mar 2022 15:47:34 +0100 Subject: [PATCH 04/16] Clarify that SchedulerPlugin must be subclassed (#6008) --- distributed/diagnostics/plugin.py | 38 ++++++++++++++++++++----------- distributed/scheduler.py | 8 ++++--- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 0da31e10199..9940102b6ed 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import os import socket @@ -5,9 +7,14 @@ import sys import uuid import zipfile +from collections.abc import Awaitable +from typing import TYPE_CHECKING from dask.utils import funcname, tmpfile +if TYPE_CHECKING: + from distributed.scheduler import Scheduler # circular import + logger = logging.getLogger(__name__) @@ -25,8 +32,11 @@ class SchedulerPlugin: Plugins are often used for diagnostics and measurement, but have full access to the scheduler and could in principle affect core scheduling. - To implement a plugin implement some of the methods of this class and add - the plugin to the scheduler with ``Scheduler.add_plugin(myplugin)``. + To implement a plugin: + + 1. subclass this class + 2. override some of its methods + 3. add the plugin to the scheduler with ``Scheduler.add_plugin(myplugin)``. Examples -------- @@ -45,28 +55,28 @@ class SchedulerPlugin: >>> scheduler.add_plugin(plugin) # doctest: +SKIP """ - async def start(self, scheduler): + async def start(self, scheduler: Scheduler) -> None: """Run when the scheduler starts up This runs at the end of the Scheduler startup process """ - pass - async def close(self): + async def close(self) -> None: """Run when the scheduler closes down This runs at the beginning of the Scheduler shutdown process, but after workers have been asked to shut down gracefully """ - pass - def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwargs): + def update_graph( + self, scheduler: Scheduler, keys: set[str], restrictions: dict, **kwargs + ) -> None: """Run when a new graph / tasks enter the scheduler""" - def restart(self, scheduler, **kwargs): + def restart(self, scheduler: Scheduler) -> None: """Run when the scheduler restarts itself""" - def transition(self, key, start, finish, *args, **kwargs): + def transition(self, key: str, start: str, finish: str, *args, **kwargs) -> None: """Run whenever a task changes state Parameters @@ -81,16 +91,18 @@ def transition(self, key, start, finish, *args, **kwargs): This may include worker ID, compute time, etc. """ - def add_worker(self, scheduler=None, worker=None, **kwargs): + def add_worker(self, scheduler: Scheduler, worker: str) -> None | Awaitable[None]: """Run when a new worker enters the cluster""" - def remove_worker(self, scheduler=None, worker=None, **kwargs): + def remove_worker( + self, scheduler: Scheduler, worker: str + ) -> None | Awaitable[None]: """Run when a worker leaves the cluster""" - def add_client(self, scheduler=None, client=None, **kwargs): + def add_client(self, scheduler: Scheduler, client: str) -> None: """Run when a new client connects""" - def remove_client(self, scheduler=None, client=None, **kwargs): + def remove_client(self, scheduler: Scheduler, client: str) -> None: """Run when a client disconnects""" diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7a1c65a9a90..5f178b0d3a0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -57,13 +57,15 @@ from distributed.active_memory_manager import ActiveMemoryManagerExtension, RetireWorker from distributed.batched import BatchedSend from distributed.comm import ( + Comm, + CommClosedError, get_address_host, normalize_address, resolve_address, unparse_host_port, ) from distributed.comm.addressing import addresses_from_user_args -from distributed.core import CommClosedError, Status, clean_exception, rpc, send_recv +from distributed.core import Status, clean_exception, rpc, send_recv from distributed.diagnostics.memory_sampler import MemorySamplerExtension from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name from distributed.event import EventExtension @@ -5449,7 +5451,7 @@ def report(self, msg: dict, ts: TaskState = None, client: str = None): "Closed comm %r while trying to write %s", c, msg, exc_info=True ) - async def add_client(self, comm, client=None, versions=None): + async def add_client(self, comm: Comm, client: str, versions: dict) -> None: """Add client to network We listen to all future messages from this Comm. @@ -5498,7 +5500,7 @@ async def add_client(self, comm, client=None, versions=None): except TypeError: # comm becomes None during GC pass - def remove_client(self, client=None): + def remove_client(self, client: str) -> None: """Remove client from network""" parent: SchedulerState = cast(SchedulerState, self) if self.status == Status.running: From e8c06696146bc797c7c6d05e0f29c7d0debe8293 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 28 Mar 2022 16:44:53 +0100 Subject: [PATCH 05/16] Better error message on misspelled executor annotation (#6009) --- distributed/tests/test_worker.py | 9 +++++++++ distributed/worker.py | 17 +++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 317ccad635f..646717fec03 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1897,6 +1897,15 @@ def get_thread_name(): assert "Dask-Foo-Threads" in gpu_result +@gen_cluster(client=True) +async def test_bad_executor_annotation(c, s, a, b): + with dask.annotate(executor="bad"): + future = c.submit(inc, 1) + with pytest.raises(ValueError, match="Invalid executor 'bad'; expected one of: "): + await future + assert future.status == "error" + + @gen_cluster(client=True) async def test_process_executor(c, s, a, b): with ProcessPoolExecutor() as e: diff --git a/distributed/worker.py b/distributed/worker.py index 7a6e040303a..434700e937b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3421,17 +3421,22 @@ async def execute(self, key: str, *, stimulus_id: str) -> None: args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs) - if ts.annotations is not None and "executor" in ts.annotations: - executor = ts.annotations["executor"] - else: + try: + executor = ts.annotations["executor"] # type: ignore + except (TypeError, KeyError): executor = "default" - assert executor in self.executors - assert key == ts.key + try: + e = self.executors[executor] + except KeyError: + raise ValueError( + f"Invalid executor {executor!r}; " + f"expected one of: {sorted(self.executors)}" + ) + self.active_keys.add(ts.key) result: dict try: - e = self.executors[executor] ts.start_time = time() if iscoroutinefunction(function): result = await apply_function_async( From fb78273fdc42b39bbabf585558305ba70582aa99 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 29 Mar 2022 14:03:06 +0100 Subject: [PATCH 06/16] Fix black in CI (#6019) --- .pre-commit-config.yaml | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 329103e7d02..7197c0b308c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,12 @@ repos: - - repo: https://github.com/MarcoGorelli/absolufy-imports - rev: v0.3.1 - hooks: + - repo: https://github.com/MarcoGorelli/absolufy-imports + rev: v0.3.1 + hooks: - id: absolufy-imports name: absolufy-imports - - repo: https://github.com/pycqa/isort - rev: 5.10.1 - hooks: + - repo: https://github.com/pycqa/isort + rev: 5.10.1 + hooks: - id: isort language_version: python3 - repo: https://github.com/asottile/pyupgrade @@ -17,23 +17,28 @@ repos: - id: pyupgrade args: - --py38-plus - - repo: https://github.com/psf/black - rev: 22.1.0 - hooks: + - repo: https://github.com/psf/black + rev: 22.3.0 + hooks: - id: black language_version: python3 exclude: versioneer.py args: - --target-version=py38 - - repo: https://gitlab.com/pycqa/flake8 - rev: 3.9.2 - hooks: + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: - id: flake8 language_version: python3 - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.931 + rev: v0.942 hooks: - id: mypy + args: + - --ignore-missing-imports + # Silence errors about Python 3.9-style delayed type annotations on Python 3.8 + - --python-version + - "3.9" additional_dependencies: # Type stubs - types-docutils From 9fbf6d40efc9c5676b956e2de4e2ea5a6586646d Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 29 Mar 2022 08:48:11 -0500 Subject: [PATCH 07/16] Name extensions and enable extension heartbeats (#5957) To enable better diagnostics, it would be useful to allow worker extensions to piggy-back on the standard heartbeat. This adds an optional "heartbeat" method to extensions, and, if present, calls a custom method that gets sent to the scheduler and processed by an extension of the same name. This also stores the extensions on the worker in a named dictionary. Previously this was a list, but I'm not sure that it was actually used anywhere. This is a breaking change without deprecation, but in a space that I suspect no one will care about. --- distributed/client.py | 10 ++++-- distributed/event.py | 2 -- distributed/lock.py | 2 -- distributed/multi_lock.py | 2 -- distributed/publish.py | 1 - distributed/pubsub.py | 3 -- distributed/queues.py | 2 -- distributed/recreate_tasks.py | 1 - distributed/scheduler.py | 44 +++++++++++++++---------- distributed/semaphore.py | 2 -- distributed/stealing.py | 1 - distributed/tests/test_worker.py | 37 +++++++++++++++++++++ distributed/tests/test_worker_client.py | 6 +--- distributed/variable.py | 2 -- distributed/worker.py | 21 +++++++++--- 15 files changed, 88 insertions(+), 48 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 49afd3792e3..f68570f7762 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -115,7 +115,10 @@ _current_client = ContextVar("_current_client", default=None) -DEFAULT_EXTENSIONS = [PubSubClientExtension] +DEFAULT_EXTENSIONS = { + "pubsub": PubSubClientExtension, +} + # Placeholder used in the get_dataset function(s) NO_DEFAULT_PLACEHOLDER = "_no_default_" @@ -928,8 +931,9 @@ def __init__( server=self, ) - for ext in extensions: - ext(self) + self.extensions = { + name: extension(self) for name, extension in extensions.items() + } preload = dask.config.get("distributed.client.preload") preload_argv = dask.config.get("distributed.client.preload-argv") diff --git a/distributed/event.py b/distributed/event.py index 0765e003158..037d171a030 100644 --- a/distributed/event.py +++ b/distributed/event.py @@ -58,8 +58,6 @@ def __init__(self, scheduler): } ) - self.scheduler.extensions["events"] = self - async def event_wait(self, name=None, timeout=None): """Wait until the event is set to true. Returns false, when this did not happen in the given time diff --git a/distributed/lock.py b/distributed/lock.py index 5830e2de94b..22e3de5e223 100644 --- a/distributed/lock.py +++ b/distributed/lock.py @@ -30,8 +30,6 @@ def __init__(self, scheduler): {"lock_acquire": self.acquire, "lock_release": self.release} ) - self.scheduler.extensions["locks"] = self - async def acquire(self, name=None, id=None, timeout=None): with log_errors(): if isinstance(name, list): diff --git a/distributed/multi_lock.py b/distributed/multi_lock.py index 31b2e6ebbdb..7907f44ecfc 100644 --- a/distributed/multi_lock.py +++ b/distributed/multi_lock.py @@ -46,8 +46,6 @@ def __init__(self, scheduler): {"multi_lock_acquire": self.acquire, "multi_lock_release": self.release} ) - self.scheduler.extensions["multi_locks"] = self - def _request_locks(self, locks: list[str], id: Hashable, num_locks: int) -> bool: """Request locks diff --git a/distributed/publish.py b/distributed/publish.py index 63772519376..161b025bbc0 100644 --- a/distributed/publish.py +++ b/distributed/publish.py @@ -26,7 +26,6 @@ def __init__(self, scheduler): } self.scheduler.handlers.update(handlers) - self.scheduler.extensions["publish"] = self def put(self, keys=None, data=None, name=None, override=False, client=None): with log_errors(): diff --git a/distributed/pubsub.py b/distributed/pubsub.py index f1cbc62e531..f575439c3a0 100644 --- a/distributed/pubsub.py +++ b/distributed/pubsub.py @@ -34,8 +34,6 @@ def __init__(self, scheduler): } ) - self.scheduler.extensions["pubsub"] = self - def add_publisher(self, name=None, worker=None): logger.debug("Add publisher: %s %s", name, worker) self.publishers[name].add(worker) @@ -178,7 +176,6 @@ def __init__(self, client): self.client._stream_handlers.update({"pubsub-msg": self.handle_message}) self.subscribers = defaultdict(weakref.WeakSet) - self.client.extensions["pubsub"] = self # TODO: circular reference async def handle_message(self, name=None, msg=None): for sub in self.subscribers[name]: diff --git a/distributed/queues.py b/distributed/queues.py index c29c4f1ab2c..3dc563b3a52 100644 --- a/distributed/queues.py +++ b/distributed/queues.py @@ -42,8 +42,6 @@ def __init__(self, scheduler): {"queue-future-release": self.future_release, "queue_release": self.release} ) - self.scheduler.extensions["queues"] = self - def create(self, name=None, client=None, maxsize=0): logger.debug(f"Queue name: {name}") if name not in self.queues: diff --git a/distributed/recreate_tasks.py b/distributed/recreate_tasks.py index 82b72092b43..8bf2d74912d 100644 --- a/distributed/recreate_tasks.py +++ b/distributed/recreate_tasks.py @@ -23,7 +23,6 @@ def __init__(self, scheduler): self.scheduler = scheduler self.scheduler.handlers["get_runspec"] = self.get_runspec self.scheduler.handlers["get_error_cause"] = self.get_error_cause - self.scheduler.extensions["replay-tasks"] = self def _process_key(self, key): if isinstance(key, list): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5f178b0d3a0..849ac7b3aa2 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -174,19 +174,20 @@ def nogil(func): Py_ssize_t, parse_bytes(dask.config.get("distributed.scheduler.default-data-size")) ) -DEFAULT_EXTENSIONS = [ - LockExtension, - MultiLockExtension, - PublishExtension, - ReplayTaskScheduler, - QueueExtension, - VariableExtension, - PubSubSchedulerExtension, - SemaphoreExtension, - EventExtension, - ActiveMemoryManagerExtension, - MemorySamplerExtension, -] +DEFAULT_EXTENSIONS = { + "locks": LockExtension, + "multi_locks": MultiLockExtension, + "publish": PublishExtension, + "replay-tasks": ReplayTaskScheduler, + "queues": QueueExtension, + "variables": VariableExtension, + "pubsub": PubSubSchedulerExtension, + "semaphores": SemaphoreExtension, + "events": EventExtension, + "amm": ActiveMemoryManagerExtension, + "memory_sampler": MemorySamplerExtension, + "stealing": WorkStealing, +} ALL_TASK_STATES = declare( set, {"released", "waiting", "no-worker", "processing", "erred", "memory"} @@ -4015,11 +4016,13 @@ def __init__( self.periodic_callbacks["idle-timeout"] = pc if extensions is None: - extensions = list(DEFAULT_EXTENSIONS) - if dask.config.get("distributed.scheduler.work-stealing"): - extensions.append(WorkStealing) - for ext in extensions: - ext(self) + extensions = DEFAULT_EXTENSIONS.copy() + if not dask.config.get("distributed.scheduler.work-stealing"): + if "stealing" in extensions: + del extensions["stealing"] + + for name, extension in extensions.items(): + self.extensions[name] = extension(self) setproctitle("dask-scheduler [not started]") Scheduler._instances.add(self) @@ -4330,6 +4333,7 @@ def heartbeat_worker( host_info: dict = None, metrics: dict, executing: dict = None, + extensions: dict = None, ): parent: SchedulerState = cast(SchedulerState, self) address = self.coerce_address(address, resolve_address) @@ -4417,6 +4421,10 @@ def heartbeat_worker( if resources: self.add_resources(worker=address, resources=resources) + if extensions: + for name, data in extensions.items(): + self.extensions[name].heartbeat(ws, data) + return { "status": "OK", "time": local_now, diff --git a/distributed/semaphore.py b/distributed/semaphore.py index 9e7abd872c0..d288462b706 100644 --- a/distributed/semaphore.py +++ b/distributed/semaphore.py @@ -69,8 +69,6 @@ def __init__(self, scheduler): } ) - self.scheduler.extensions["semaphores"] = self - # {metric_name: {semaphore_name: metric}} self.metrics = { "acquire_total": defaultdict(int), # counter diff --git a/distributed/stealing.py b/distributed/stealing.py index 8fb7e14a19f..54ef0098c63 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -71,7 +71,6 @@ def __init__(self, scheduler): ) # `callback_time` is in milliseconds self.scheduler.add_plugin(self) - self.scheduler.extensions["stealing"] = self self.scheduler.events["stealing"] = deque(maxlen=100000) self.count = 0 # { task state: } diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 646717fec03..d3a0ce8b28f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3324,6 +3324,43 @@ async def test_Worker__to_dict(c, s, a): assert d["data"] == ["x"] +@gen_cluster(nthreads=[]) +async def test_extension_methods(s): + flag = False + shutdown = False + + class WorkerExtension: + def __init__(self, worker): + pass + + def heartbeat(self): + return {"data": 123} + + async def close(self): + nonlocal shutdown + shutdown = True + + class SchedulerExtension: + def __init__(self, scheduler): + self.scheduler = scheduler + pass + + def heartbeat(self, ws, data: dict): + nonlocal flag + assert ws in self.scheduler.workers.values() + assert data == {"data": 123} + flag = True + + s.extensions["test"] = SchedulerExtension(s) + + async with Worker(s.address, extensions={"test": WorkerExtension}) as w: + assert not shutdown + await w.heartbeat() + assert flag + + assert shutdown + + @gen_cluster() async def test_benchmark_hardware(s, a, b): sizes = ["1 kiB", "10 kiB"] diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index ecdfa8fd003..468bd90d463 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -254,13 +254,9 @@ def test_secede_without_stealing_issue_1262(): Tests that seceding works with the Stealing extension disabled https://github.com/dask/distributed/issues/1262 """ - - # turn off all extensions - extensions = [] - # run the loop as an inner function so all workers are closed # and exceptions can be examined - @gen_cluster(client=True, scheduler_kwargs={"extensions": extensions}) + @gen_cluster(client=True, scheduler_kwargs={"extensions": {}}) async def secede_test(c, s, a, b): def func(x): with worker_client() as wc: diff --git a/distributed/variable.py b/distributed/variable.py index a27abc3ab85..143df9e4153 100644 --- a/distributed/variable.py +++ b/distributed/variable.py @@ -40,8 +40,6 @@ def __init__(self, scheduler): self.scheduler.stream_handlers["variable-future-release"] = self.future_release self.scheduler.stream_handlers["variable_delete"] = self.delete - self.scheduler.extensions["variables"] = self - async def set(self, name=None, key=None, data=None, client=None): if key is not None: record = {"type": "Future", "value": key} diff --git a/distributed/worker.py b/distributed/worker.py index 434700e937b..c607bb5afa1 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -141,7 +141,10 @@ no_value = "--no-value-sentinel--" -DEFAULT_EXTENSIONS: list[type] = [PubSubWorkerExtension, ShuffleWorkerExtension] +DEFAULT_EXTENSIONS: dict[str, type] = { + "pubsub": PubSubWorkerExtension, + "shuffle": ShuffleWorkerExtension, +} DEFAULT_METRICS: dict[str, Callable[[Worker], Any]] = {} @@ -437,7 +440,7 @@ def __init__( security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = "1s", - extensions: list[type] | None = None, + extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = DEFAULT_METRICS, startup_information: Mapping[ str, Callable[[Worker], Any] @@ -790,8 +793,9 @@ def __init__( if extensions is None: extensions = DEFAULT_EXTENSIONS - for ext in extensions: - ext(self) + self.extensions = { + name: extension(self) for name, extension in extensions.items() + } self.memory_manager = WorkerMemoryManager( self, @@ -1132,6 +1136,11 @@ async def heartbeat(self): for key in self.active_keys if key in self.tasks }, + extensions={ + name: extension.heartbeat() + for name, extension in self.extensions.items() + if hasattr(extension, "heartbeat") + }, ) end = time() middle = (start + end) / 2 @@ -1414,6 +1423,10 @@ async def close( for preload in self.preloads: await preload.teardown() + for extension in self.extensions.values(): + if hasattr(extension, "close"): + await extension.close() + if nanny and self.nanny: with self.rpc(self.nanny) as r: await r.close_gracefully() From ed48736a805be0b50a1f427a6c2defb609f2c738 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 29 Mar 2022 15:30:27 +0100 Subject: [PATCH 08/16] remove check_python_3 (broken with click>=8.1.0) (#6018) --- .../recipes/distributed/meta.yaml | 6 +-- distributed/cli/dask_scheduler.py | 9 +--- distributed/cli/dask_ssh.py | 8 +--- distributed/cli/dask_worker.py | 9 +--- distributed/cli/utils.py | 46 ------------------- setup.py | 6 +-- 6 files changed, 11 insertions(+), 73 deletions(-) diff --git a/continuous_integration/recipes/distributed/meta.yaml b/continuous_integration/recipes/distributed/meta.yaml index 0353bdf4c79..13ebf926096 100644 --- a/continuous_integration/recipes/distributed/meta.yaml +++ b/continuous_integration/recipes/distributed/meta.yaml @@ -47,9 +47,9 @@ outputs: track_features: # [cython_enabled] - cythonized-scheduler # [cython_enabled] entry_points: - - dask-scheduler = distributed.cli.dask_scheduler:go - - dask-ssh = distributed.cli.dask_ssh:go - - dask-worker = distributed.cli.dask_worker:go + - dask-scheduler = distributed.cli.dask_scheduler:main + - dask-ssh = distributed.cli.dask_ssh:main + - dask-worker = distributed.cli.dask_worker:main requirements: build: - {{ compiler('c') }} # [cython_enabled] diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index d4b9657740f..c7a18517876 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -10,7 +10,7 @@ from tornado.ioloop import IOLoop from distributed import Scheduler -from distributed.cli.utils import check_python_3, install_signal_handlers +from distributed.cli.utils import install_signal_handlers from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, @@ -212,10 +212,5 @@ async def run(): logger.info("End scheduler at %r", scheduler.address) -def go(): - check_python_3() - main() - - if __name__ == "__main__": - go() # pragma: no cover + main() # pragma: no cover diff --git a/distributed/cli/dask_ssh.py b/distributed/cli/dask_ssh.py index bcc1c6ee439..baa8cc04af3 100755 --- a/distributed/cli/dask_ssh.py +++ b/distributed/cli/dask_ssh.py @@ -5,7 +5,6 @@ import click -from distributed.cli.utils import check_python_3 from distributed.deploy.old_ssh import SSHCluster logger = logging.getLogger("distributed.dask_ssh") @@ -223,10 +222,5 @@ def main( print("[ dask-ssh ]: Remote processes have been terminated. Exiting.") -def go(): - check_python_3() - main() - - if __name__ == "__main__": - go() # pragma: no cover + main() # pragma: no cover diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 762bf9e46c8..f13672a4d10 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -16,7 +16,7 @@ from dask.system import CPU_COUNT from distributed import Nanny -from distributed.cli.utils import check_python_3, install_signal_handlers +from distributed.cli.utils import install_signal_handlers from distributed.comm import get_address_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv @@ -486,10 +486,5 @@ async def run(): logger.info("End worker") -def go(): - check_python_3() - main() - - if __name__ == "__main__": - go() # pragma: no cover + main() # pragma: no cover diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 472ddbbf681..a0191edaa90 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,51 +1,5 @@ -import click -from packaging.version import parse as parse_version from tornado.ioloop import IOLoop -CLICK_VERSION = parse_version(click.__version__) - -py3_err_msg = """ -Warning: Your terminal does not set locales. - -If you use unicode text inputs for command line options then this may cause -undesired behavior. This is rare. - -If you don't use unicode characters in command line options then you can safely -ignore this message. This is the common case. - -You can support unicode inputs by specifying encoding environment variables, -though exact solutions may depend on your system: - - $ export LC_ALL=C.UTF-8 - $ export LANG=C.UTF-8 - -For more information see: http://click.pocoo.org/5/python3/ -""".lstrip() - - -def check_python_3(): - """Ensures that the environment is good for unicode on Python 3.""" - # https://github.com/pallets/click/issues/448#issuecomment-246029304 - import click.core - - # TODO: Remove use of internal click functions - if CLICK_VERSION < parse_version("8.0.0"): - click.core._verify_python3_env = lambda: None - else: - click.core._verify_python_env = lambda: None - - try: - from click import _unicodefun - - if CLICK_VERSION < parse_version("8.0.0"): - _unicodefun._verify_python3_env() - else: - _unicodefun._verify_python_env() - except (TypeError, RuntimeError): - import click - - click.echo(py3_err_msg, err=True) - def install_signal_handlers(loop=None, cleanup=None): """ diff --git a/setup.py b/setup.py index 999c0a8b9d6..0f57c525795 100755 --- a/setup.py +++ b/setup.py @@ -105,9 +105,9 @@ ], entry_points=""" [console_scripts] - dask-ssh=distributed.cli.dask_ssh:go - dask-scheduler=distributed.cli.dask_scheduler:go - dask-worker=distributed.cli.dask_worker:go + dask-ssh=distributed.cli.dask_ssh:main + dask-scheduler=distributed.cli.dask_scheduler:main + dask-worker=distributed.cli.dask_worker:main """, # https://mypy.readthedocs.io/en/latest/installed_packages.html zip_safe=False, From 852d5f5e4ae83e97c5f2c8ae859e3333a9556be4 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 29 Mar 2022 23:00:38 +0100 Subject: [PATCH 09/16] Drop runtime dependency to setuptools (#6017) --- .pre-commit-config.yaml | 1 - continuous_integration/recipes/dask/meta.yaml | 1 + continuous_integration/recipes/distributed/meta.yaml | 5 +++-- distributed/tests/test_client.py | 7 ++++--- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7197c0b308c..6276fe1c17b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -45,7 +45,6 @@ repos: - types-requests - types-paramiko - types-PyYAML - - types-setuptools - types-psutil # Typed libraries - numpy diff --git a/continuous_integration/recipes/dask/meta.yaml b/continuous_integration/recipes/dask/meta.yaml index 90c69295125..01746b7b51f 100644 --- a/continuous_integration/recipes/dask/meta.yaml +++ b/continuous_integration/recipes/dask/meta.yaml @@ -19,6 +19,7 @@ build: requirements: host: - python >=3.8 + - setuptools run: - python >=3.8 - dask-core >={{ dask_version }} diff --git a/continuous_integration/recipes/distributed/meta.yaml b/continuous_integration/recipes/distributed/meta.yaml index 13ebf926096..8222c1a07fd 100644 --- a/continuous_integration/recipes/distributed/meta.yaml +++ b/continuous_integration/recipes/distributed/meta.yaml @@ -54,11 +54,12 @@ outputs: build: - {{ compiler('c') }} # [cython_enabled] host: - - python + - python >=3.8 - pip + - setuptools - cython # [cython_enabled] run: - - python + - python >=3.8 - click >=6.6 - cloudpickle >=1.5.0 - cytoolz >=0.8.2 diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7b419000327..7479b8cff48 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1637,6 +1637,8 @@ def g(): @gen_cluster(client=True) async def test_upload_file_egg(c, s, a, b): + pytest.importorskip("setuptools") + def g(): import package_1 import package_2 @@ -1657,9 +1659,8 @@ def g(): with open(os.path.join(dirname, "setup.py"), "w") as f: f.write("from setuptools import setup, find_packages\n") f.write( - 'setup(name="my_package", packages=find_packages(), version="{}")\n'.format( - value - ) + 'setup(name="my_package", packages=find_packages(), ' + f'version="{value}")\n' ) # test a package with an underscore in the name From e6911d910ae6cd4d567b7e6746d75aceefa437b8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 30 Mar 2022 02:39:40 +0100 Subject: [PATCH 10/16] More idiomatic mypy configuration (#6022) --- .pre-commit-config.yaml | 8 +++----- setup.cfg | 5 +++++ 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6276fe1c17b..cb9e74ed986 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -34,11 +34,8 @@ repos: rev: v0.942 hooks: - id: mypy - args: - - --ignore-missing-imports - # Silence errors about Python 3.9-style delayed type annotations on Python 3.8 - - --python-version - - "3.9" + # Override default --ignore-missing-imports + args: [] additional_dependencies: # Type stubs - types-docutils @@ -46,6 +43,7 @@ repos: - types-paramiko - types-PyYAML - types-psutil + - types-setuptools # Typed libraries - numpy - dask diff --git a/setup.cfg b/setup.cfg index dd99eccfc7a..34fa189fe23 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,3 +59,8 @@ timeout_method = thread # This should not be reduced; Windows CI has been observed to be occasionally # exceptionally slow. timeout = 300 + +[mypy] +# Silence errors about Python 3.9-style delayed type annotations on Python 3.8 +python_version = 3.9 +ignore_missing_imports = true From 4c4df91085e4b5ca70789c3df8009fb15590d540 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 30 Mar 2022 09:18:06 +0100 Subject: [PATCH 11/16] Python 3.10 (#5952) --- .github/workflows/tests.yaml | 8 +- continuous_integration/environment-3.10.yaml | 46 ++++++++ distributed/node.py | 5 +- distributed/profile.py | 29 +++++- distributed/tests/test_client.py | 11 ++ distributed/tests/test_profile.py | 104 +++++++++++++++++++ distributed/utils_test.py | 17 +-- setup.py | 3 + 8 files changed, 201 insertions(+), 22 deletions(-) create mode 100644 continuous_integration/environment-3.10.yaml diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 7e087f99757..e5b74d5b49a 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -23,7 +23,7 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: ["3.8", "3.9"] + python-version: ["3.8", "3.9", "3.10"] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] include: @@ -65,12 +65,6 @@ jobs: shell: bash -l {0} run: conda config --show - - name: Install stacktrace - shell: bash -l {0} - # stacktrace for Python 3.8 has not been released at the moment of writing - if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }} - run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace - - name: Hack around https://github.com/ipython/ipython/issues/12197 # This upstream issue causes an interpreter crash when running # distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml new file mode 100644 index 00000000000..0ee82922262 --- /dev/null +++ b/continuous_integration/environment-3.10.yaml @@ -0,0 +1,46 @@ +name: dask-distributed +channels: + - conda-forge + - defaults +dependencies: + - python=3.10 + - packaging + - pip + - asyncssh + - bokeh + - click + - cloudpickle + - coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310 + - dask # overridden by git tip below + - filesystem-spec # overridden by git tip below + - h5py + - ipykernel + - ipywidgets + - jinja2 + - jupyter_client + - msgpack-python + - netcdf4 + - paramiko + - pre-commit + - prometheus_client + - psutil + - pytest + - pytest-cov + - pytest-faulthandler + - pytest-repeat + - pytest-rerunfailures + - pytest-timeout + - requests + - s3fs # overridden by git tip below + - scikit-learn + - scipy + - sortedcollections + - tblib + - toolz + - tornado=6 + - zict # overridden by git tip below + - zstandard + - pip: + - git+https://github.com/dask/dask + - git+https://github.com/dask/zict + - pytest-asyncio<0.14.0 # `pytest-asyncio<0.14.0` isn't available on conda-forge for Python 3.10 diff --git a/distributed/node.py b/distributed/node.py index 6db2c7711ea..6fedd1b8ace 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -131,12 +131,9 @@ def start_http_server( import ssl ssl_options = ssl.create_default_context( - cafile=tls_ca_file, purpose=ssl.Purpose.SERVER_AUTH + cafile=tls_ca_file, purpose=ssl.Purpose.CLIENT_AUTH ) ssl_options.load_cert_chain(tls_cert, keyfile=tls_key) - # We don't care about auth here, just encryption - ssl_options.check_hostname = False - ssl_options.verify_mode = ssl.CERT_NONE self.http_server = HTTPServer(self.http_application, ssl_options=ssl_options) diff --git a/distributed/profile.py b/distributed/profile.py index bb832735e8d..22a2fc80cff 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -27,6 +27,7 @@ from __future__ import annotations import bisect +import dis import linecache import sys import threading @@ -59,21 +60,41 @@ def identifier(frame): ) +# work around some frames lacking an f_lineo eg: https://bugs.python.org/issue47085 +def _f_lineno(frame): + f_lineno = frame.f_lineno + if f_lineno is not None: + return f_lineno + + f_lasti = frame.f_lasti + code = frame.f_code + prev_line = code.co_firstlineno + + for start, next_line in dis.findlinestarts(code): + if f_lasti < start: + return prev_line + prev_line = next_line + + return prev_line + + def repr_frame(frame): """Render a frame as a line for inclusion into a text traceback""" co = frame.f_code - text = f' File "{co.co_filename}", line {frame.f_lineno}, in {co.co_name}' - line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() + f_lineno = _f_lineno(frame) + text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}' + line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return text + "\n\t" + line def info_frame(frame): co = frame.f_code - line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip() + f_lineno = _f_lineno(frame) + line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return { "filename": co.co_filename, "name": co.co_name, - "line_number": frame.f_lineno, + "line_number": f_lineno, "line": line, } diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7479b8cff48..84c5882bdef 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -6463,6 +6463,10 @@ async def f(stacklevel, mode=None): assert "cdn.bokeh.org" in data +@pytest.mark.skipif( + sys.version_info >= (3, 10), + reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", +) @gen_cluster(nthreads=[]) async def test_client_gather_semaphore_loop(s): async with Client(s.address, asynchronous=True) as c: @@ -6473,9 +6477,16 @@ async def test_client_gather_semaphore_loop(s): async def test_as_completed_condition_loop(c, s, a, b): seq = c.map(inc, range(5)) ac = as_completed(seq) + # consume the ac so that the ac.condition is bound to the loop on py3.10+ + async for _ in ac: + pass assert ac.condition._loop == c.loop.asyncio_loop +@pytest.mark.skipif( + sys.version_info >= (3, 10), + reason="On Py3.10+ semaphore._loop is not bound until .acquire() blocks", +) def test_client_connectionpool_semaphore_loop(s, a, b): with Client(s["address"]) as c: assert c.rpc.semaphore._loop is c.loop.asyncio_loop diff --git a/distributed/tests/test_profile.py b/distributed/tests/test_profile.py index 7d044945d7c..75eb704c99f 100644 --- a/distributed/tests/test_profile.py +++ b/distributed/tests/test_profile.py @@ -1,5 +1,9 @@ +from __future__ import annotations + +import dataclasses import sys import threading +from collections.abc import Iterator, Sequence from time import sleep import pytest @@ -11,6 +15,7 @@ call_stack, create, identifier, + info_frame, ll_get_stack, llprocess, merge, @@ -200,3 +205,102 @@ def stop(): while threading.active_count() > start_threads: assert time() < start + 2 sleep(0.01) + + +@dataclasses.dataclass(frozen=True) +class FakeCode: + co_filename: str + co_name: str + co_firstlineno: int + co_lnotab: bytes + co_lines_seq: Sequence[tuple[int, int, int | None]] + co_code: bytes + + def co_lines(self) -> Iterator[tuple[int, int, int | None]]: + yield from self.co_lines_seq + + +FAKE_CODE = FakeCode( + co_filename="", + co_name="example", + co_firstlineno=1, + # https://github.com/python/cpython/blob/b68431fadb3150134ac6ccbf501cdfeaf4c75678/Objects/lnotab_notes.txt#L84 + # generated from: + # def example(): + # for i in range(1): + # if i >= 0: + # pass + # example.__code__.co_lnotab + co_lnotab=b"\x00\x01\x0c\x01\x08\x01\x04\xfe", + # generated with list(example.__code__.co_lines()) + co_lines_seq=[ + (0, 12, 2), + (12, 20, 3), + (20, 22, 4), + (22, 24, None), + (24, 28, 2), + ], + # used in dis.findlinestarts as bytecode_len = len(code.co_code) + # https://github.com/python/cpython/blob/6f345d363308e3e6ecf0ad518ea0fcc30afde2a8/Lib/dis.py#L457 + co_code=bytes(28), +) + + +@dataclasses.dataclass(frozen=True) +class FakeFrame: + f_lasti: int + f_code: FakeCode + f_lineno: int | None = None + f_back: FakeFrame | None = None + f_globals: dict[str, object] = dataclasses.field(default_factory=dict) + + +@pytest.mark.parametrize( + "f_lasti,f_lineno", + [ + (-1, 1), + (0, 2), + (1, 2), + (11, 2), + (12, 3), + (21, 4), + (22, 4), + (23, 4), + (24, 2), + (25, 2), + (26, 2), + (27, 2), + (100, 2), + ], +) +def test_info_frame_f_lineno(f_lasti: int, f_lineno: int) -> None: + assert info_frame(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == { + "filename": "", + "name": "example", + "line_number": f_lineno, + "line": "", + } + + +@pytest.mark.parametrize( + "f_lasti,f_lineno", + [ + (-1, 1), + (0, 2), + (1, 2), + (11, 2), + (12, 3), + (21, 4), + (22, 4), + (23, 4), + (24, 2), + (25, 2), + (26, 2), + (27, 2), + (100, 2), + ], +) +def test_call_stack_f_lineno(f_lasti: int, f_lineno: int) -> None: + assert call_stack(FakeFrame(f_lasti=f_lasti, f_code=FAKE_CODE)) == [ + f' File "", line {f_lineno}, in example\n\t' + ] diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 3e558a0b55a..a8e2652d120 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -739,13 +739,16 @@ def cluster( except KeyError: rpc_kwargs = {} - with rpc(saddr, **rpc_kwargs) as s: - while True: - nthreads = loop.run_sync(s.ncores) - if len(nthreads) == nworkers: - break - if time() - start > 5: - raise Exception("Timeout on cluster creation") + async def wait_for_workers(): + async with rpc(saddr, **rpc_kwargs) as s: + while True: + nthreads = await s.ncores() + if len(nthreads) == nworkers: + break + if time() - start > 5: + raise Exception("Timeout on cluster creation") + + loop.run_sync(wait_for_workers) # avoid sending processes down to function yield {"address": saddr}, [ diff --git a/setup.py b/setup.py index 0f57c525795..1661783f36d 100755 --- a/setup.py +++ b/setup.py @@ -98,8 +98,11 @@ "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", "Topic :: Scientific/Engineering", "Topic :: System :: Distributed Computing", ], From cced80d5ea715a916415c6dc01c95343b1ec7af8 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 30 Mar 2022 15:42:42 +0200 Subject: [PATCH 12/16] Cluster Dump SchedulerPlugin (#5983) Add SchedulerPlugin to dump state on cluster close This also adds a new method to SchedulerPlugins that runs directly before closing time --- distributed/client.py | 4 +- distributed/cluster_dump.py | 5 ++- distributed/diagnostics/cluster_dump.py | 38 +++++++++++++++++++ distributed/diagnostics/plugin.py | 3 ++ .../tests/test_cluster_dump_plugin.py | 21 ++++++++++ distributed/scheduler.py | 5 +++ 6 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 distributed/diagnostics/cluster_dump.py create mode 100644 distributed/diagnostics/tests/test_cluster_dump_plugin.py diff --git a/distributed/client.py b/distributed/client.py index f68570f7762..d406ca6333a 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3934,8 +3934,8 @@ async def _dump_cluster_state( self, filename: str = "dask-cluster-dump", write_from_scheduler: bool | None = None, - exclude: Collection[str] = ("run_spec",), - format: Literal["msgpack", "yaml"] = "msgpack", + exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE, + format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT, **storage_options, ): filename = str(filename) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 2d3a400b256..161f9091e7b 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -14,6 +14,9 @@ from distributed.stories import scheduler_story as _scheduler_story from distributed.stories import worker_story as _worker_story +DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack" +DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",) + def _tuple_to_list(node): if isinstance(node, (list, tuple)): @@ -27,7 +30,7 @@ def _tuple_to_list(node): async def write_state( get_state: Callable[[], Awaitable[Any]], url: str, - format: Literal["msgpack", "yaml"], + format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT, **storage_options: dict[str, Any], ) -> None: "Await a cluster dump, then serialize and write it to a path" diff --git a/distributed/diagnostics/cluster_dump.py b/distributed/diagnostics/cluster_dump.py new file mode 100644 index 00000000000..c03b0293c05 --- /dev/null +++ b/distributed/diagnostics/cluster_dump.py @@ -0,0 +1,38 @@ +from typing import Any, Collection, Dict, Literal + +from distributed.cluster_dump import ( + DEFAULT_CLUSTER_DUMP_EXCLUDE, + DEFAULT_CLUSTER_DUMP_FORMAT, +) +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.scheduler import Scheduler + + +class ClusterDump(SchedulerPlugin): + """Dumps cluster state prior to Scheduler shutdown + + The Scheduler may shutdown in cases where it is in an error state, + or when it has been unexpectedly idle for long periods of time. + This plugin dumps the cluster state prior to Scheduler shutdown + for debugging purposes. + """ + + def __init__( + self, + url: str, + exclude: "Collection[str]" = DEFAULT_CLUSTER_DUMP_EXCLUDE, + format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT, + **storage_options: Dict[str, Any], + ): + self.url = url + self.exclude = exclude + self.format = format_ + self.storage_options = storage_options + + async def start(self, scheduler: Scheduler) -> None: + self.scheduler = scheduler + + async def before_close(self) -> None: + await self.scheduler.dump_cluster_state_to_url( + self.url, self.exclude, self.format, **self.storage_options + ) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 9940102b6ed..cf573eed562 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -61,6 +61,9 @@ async def start(self, scheduler: Scheduler) -> None: This runs at the end of the Scheduler startup process """ + async def before_close(self) -> None: + """Runs prior to any Scheduler shutdown logic""" + async def close(self) -> None: """Run when the scheduler closes down diff --git a/distributed/diagnostics/tests/test_cluster_dump_plugin.py b/distributed/diagnostics/tests/test_cluster_dump_plugin.py new file mode 100644 index 00000000000..67ce815954d --- /dev/null +++ b/distributed/diagnostics/tests/test_cluster_dump_plugin.py @@ -0,0 +1,21 @@ +from distributed.cluster_dump import DumpArtefact +from distributed.diagnostics.cluster_dump import ClusterDump +from distributed.utils_test import gen_cluster, inc + + +@gen_cluster(client=True) +async def test_cluster_dump_plugin(c, s, *workers, tmp_path): + dump_file = tmp_path / "cluster_dump.msgpack.gz" + await c.register_scheduler_plugin(ClusterDump(str(dump_file)), name="cluster-dump") + plugin = s.plugins["cluster-dump"] + assert plugin.scheduler is s + + f1 = c.submit(inc, 1) + f2 = c.submit(inc, f1) + + assert (await f2) == 3 + await s.close(close_workers=True) + + dump = DumpArtefact.from_url(str(dump_file)) + assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys()) + assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys()) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 849ac7b3aa2..4882bf81919 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4245,6 +4245,11 @@ async def close(self, fast=False, close_workers=False): if self.status in (Status.closing, Status.closed): await self.finished() return + + await asyncio.gather( + *[plugin.before_close() for plugin in list(self.plugins.values())] + ) + self.status = Status.closing logger.info("Scheduler closing...") From c8a97a33449eedc110169cb9b3f0120124d95e49 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 30 Mar 2022 13:56:00 -0500 Subject: [PATCH 13/16] Add tiny test for ToPickle (#6021) --- distributed/protocol/tests/test_to_pickle.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/distributed/protocol/tests/test_to_pickle.py b/distributed/protocol/tests/test_to_pickle.py index 7db7a5d9738..d3099c9a73a 100644 --- a/distributed/protocol/tests/test_to_pickle.py +++ b/distributed/protocol/tests/test_to_pickle.py @@ -4,10 +4,22 @@ from dask.highlevelgraph import HighLevelGraph, MaterializedLayer from distributed.client import Client +from distributed.protocol import dumps, loads from distributed.protocol.serialize import ToPickle from distributed.utils_test import gen_cluster +def test_ToPickle(): + class Foo: + def __init__(self, data): + self.data = data + + msg = {"x": ToPickle(Foo(123))} + frames = dumps(msg) + out = loads(frames) + assert out["x"].data == 123 + + class NonMsgPackSerializableLayer(MaterializedLayer): """Layer that uses non-msgpack-serializable data""" From 8906cab53a8619bdab4c7bd0fb6545830c9aa6af Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 30 Mar 2022 14:28:22 -0500 Subject: [PATCH 14/16] Update gpuCI `RAPIDS_VER` to `22.06` (#5962) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- continuous_integration/gpuci/axis.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/continuous_integration/gpuci/axis.yaml b/continuous_integration/gpuci/axis.yaml index 922eee23c14..41ddb56ec2d 100644 --- a/continuous_integration/gpuci/axis.yaml +++ b/continuous_integration/gpuci/axis.yaml @@ -8,6 +8,6 @@ LINUX_VER: - ubuntu18.04 RAPIDS_VER: -- "22.04" +- "22.06" excludes: From b9902d79ab714fd96fecbc188331cfff7e3153f5 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 30 Mar 2022 17:50:11 -0500 Subject: [PATCH 15/16] Retry on transient error codes in preload (#5982) --- distributed/preloading.py | 16 ++++++++++++---- requirements.txt | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/distributed/preloading.py b/distributed/preloading.py index 458642a01d9..07adbde48ba 100644 --- a/distributed/preloading.py +++ b/distributed/preloading.py @@ -6,13 +6,13 @@ import os import shutil import sys -import urllib.request from collections.abc import Iterable from importlib import import_module from types import ModuleType from typing import TYPE_CHECKING, cast import click +import urllib3 from dask.utils import tmpfile @@ -131,9 +131,17 @@ def _download_module(url: str) -> ModuleType: logger.info("Downloading preload at %s", url) assert is_webaddress(url) - request = urllib.request.Request(url, method="GET") - response = urllib.request.urlopen(request) - source = response.read().decode() + with urllib3.PoolManager() as http: + response = http.request( + method="GET", + url=url, + retries=urllib3.util.Retry( + status_forcelist=[429, 504, 503, 502], + backoff_factor=0.2, + ), + ) + + source = response.data compiled = compile(source, url, "exec") module = ModuleType(url) diff --git a/requirements.txt b/requirements.txt index b6b7ed5d824..efc734dfe6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,6 @@ sortedcontainers !=2.0.0, !=2.0.1 tblib >= 1.6.0 toolz >= 0.8.2 tornado >= 6.0.3 +urllib3 zict >= 0.1.3 pyyaml From 2ff681cc0630badcd3175ea45d74760f7ad9a10d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 31 Mar 2022 01:35:14 -0500 Subject: [PATCH 16/16] Remove support for PyPy (#6029) --- distributed/compatibility.py | 2 -- distributed/tests/test_spill.py | 2 -- distributed/utils_perf.py | 9 --------- docs/source/protocol.rst | 6 ++---- 4 files changed, 2 insertions(+), 17 deletions(-) diff --git a/distributed/compatibility.py b/distributed/compatibility.py index f0151267867..ca49f7b0b27 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -1,14 +1,12 @@ from __future__ import annotations import logging -import platform import sys logging_names: dict[str | int, int | str] = {} logging_names.update(logging._levelToName) # type: ignore logging_names.update(logging._nameToLevel) # type: ignore -PYPY = platform.python_implementation().lower() == "pypy" LINUX = sys.platform == "linux" MACOS = sys.platform == "darwin" WINDOWS = sys.platform.startswith("win") diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 2581701aa1f..6ab595933e5 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -1,6 +1,5 @@ from __future__ import annotations -import gc import logging import os import uuid @@ -338,7 +337,6 @@ def test_weakref_cache(tmpdir, cls, expect_cached, size): # the same id as a deleted one id_x = x.id del x - gc.collect() # Only needed on pypy if size < 100: buf["y"] diff --git a/distributed/utils_perf.py b/distributed/utils_perf.py index 6643b738b99..41ff877dbc5 100644 --- a/distributed/utils_perf.py +++ b/distributed/utils_perf.py @@ -5,7 +5,6 @@ from dask.utils import format_bytes -from distributed.compatibility import PYPY from distributed.metrics import thread_time logger = _logger = logging.getLogger(__name__) @@ -144,8 +143,6 @@ def __init__(self, warn_over_frac=0.1, info_over_rss_win=10 * 1e6): self._enabled = False def enable(self): - if PYPY: - return assert not self._enabled self._fractional_timer = FractionalTimer(n_samples=self.N_SAMPLES) try: @@ -162,8 +159,6 @@ def enable(self): self._enabled = True def disable(self): - if PYPY: - return assert self._enabled gc.callbacks.remove(self._gc_callback) self._enabled = False @@ -229,8 +224,6 @@ def enable_gc_diagnosis(): """ Ask to enable global GC diagnosis. """ - if PYPY: - return global _gc_diagnosis_users with _gc_diagnosis_lock: if _gc_diagnosis_users == 0: @@ -244,8 +237,6 @@ def disable_gc_diagnosis(force=False): """ Ask to disable global GC diagnosis. """ - if PYPY: - return global _gc_diagnosis_users with _gc_diagnosis_lock: if _gc_diagnosis_users > 0: diff --git a/docs/source/protocol.rst b/docs/source/protocol.rst index 334e2c0e4bd..9f5d4990909 100644 --- a/docs/source/protocol.rst +++ b/docs/source/protocol.rst @@ -135,13 +135,11 @@ the scheduler may differ.** This has a few advantages: 1. The Scheduler is protected from unpickling unsafe code -2. The Scheduler can be run under ``pypy`` for improved performance. This is - only useful for larger clusters. -3. We could conceivably implement workers and clients for other languages +2. We could conceivably implement workers and clients for other languages (like R or Julia) and reuse the Python scheduler. The worker and client code is fairly simple and much easier to reimplement than the scheduler, which is complex. -4. The scheduler might some day be rewritten in more heavily optimized C or Go +3. The scheduler might some day be rewritten in more heavily optimized C or Go Compression -----------