Skip to content

Commit

Permalink
Merge branch 'main' into scheduler-task-transition-tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Mar 31, 2022
2 parents 6e58f91 + 2ff681c commit 6d6a0ee
Show file tree
Hide file tree
Showing 52 changed files with 1,016 additions and 224 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: ["3.8", "3.9"]
python-version: ["3.8", "3.9", "3.10"]
# Cherry-pick test modules to split the overall runtime roughly in half
partition: [ci1, not ci1]
include:
Expand Down Expand Up @@ -65,12 +65,6 @@ jobs:
shell: bash -l {0}
run: conda config --show

- name: Install stacktrace
shell: bash -l {0}
# stacktrace for Python 3.8 has not been released at the moment of writing
if: ${{ matrix.os == 'ubuntu-latest' && matrix.python-version < '3.8' }}
run: mamba install -c conda-forge -c defaults -c numba libunwind stacktrace

- name: Hack around https://github.com/ipython/ipython/issues/12197
# This upstream issue causes an interpreter crash when running
# distributed/protocol/tests/test_serialize.py::test_profile_nested_sizeof
Expand Down
30 changes: 16 additions & 14 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
repos:
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports
name: absolufy-imports
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
language_version: python3
- repo: https://github.com/asottile/pyupgrade
Expand All @@ -17,31 +17,33 @@ repos:
- id: pyupgrade
args:
- --py38-plus
- repo: https://github.com/psf/black
rev: 22.1.0
hooks:
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3
exclude: versioneer.py
args:
- --target-version=py38
- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
hooks:
- repo: https://gitlab.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
language_version: python3
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.931
rev: v0.942
hooks:
- id: mypy
# Override default --ignore-missing-imports
args: []
additional_dependencies:
# Type stubs
- types-docutils
- types-requests
- types-paramiko
- types-PyYAML
- types-setuptools
- types-psutil
- types-setuptools
# Typed libraries
- numpy
- dask
Expand Down
46 changes: 46 additions & 0 deletions continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: dask-distributed
channels:
- conda-forge
- defaults
dependencies:
- python=3.10
- packaging
- pip
- asyncssh
- bokeh
- click
- cloudpickle
- coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- h5py
- ipykernel
- ipywidgets
- jinja2
- jupyter_client
- msgpack-python
- netcdf4
- paramiko
- pre-commit
- prometheus_client
- psutil
- pytest
- pytest-cov
- pytest-faulthandler
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- requests
- s3fs # overridden by git tip below
- scikit-learn
- scipy
- sortedcollections
- tblib
- toolz
- tornado=6
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask/zict
- pytest-asyncio<0.14.0 # `pytest-asyncio<0.14.0` isn't available on conda-forge for Python 3.10
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/axis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ LINUX_VER:
- ubuntu18.04

RAPIDS_VER:
- "22.04"
- "22.06"

excludes:
1 change: 1 addition & 0 deletions continuous_integration/recipes/dask/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ build:
requirements:
host:
- python >=3.8
- setuptools
run:
- python >=3.8
- dask-core >={{ dask_version }}
Expand Down
11 changes: 6 additions & 5 deletions continuous_integration/recipes/distributed/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,19 @@ outputs:
track_features: # [cython_enabled]
- cythonized-scheduler # [cython_enabled]
entry_points:
- dask-scheduler = distributed.cli.dask_scheduler:go
- dask-ssh = distributed.cli.dask_ssh:go
- dask-worker = distributed.cli.dask_worker:go
- dask-scheduler = distributed.cli.dask_scheduler:main
- dask-ssh = distributed.cli.dask_ssh:main
- dask-worker = distributed.cli.dask_worker:main
requirements:
build:
- {{ compiler('c') }} # [cython_enabled]
host:
- python
- python >=3.8
- pip
- setuptools
- cython # [cython_enabled]
run:
- python
- python >=3.8
- click >=6.6
- cloudpickle >=1.5.0
- cytoolz >=0.8.2
Expand Down
9 changes: 2 additions & 7 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from tornado.ioloop import IOLoop

from distributed import Scheduler
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.cli.utils import install_signal_handlers
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
Expand Down Expand Up @@ -212,10 +212,5 @@ async def run():
logger.info("End scheduler at %r", scheduler.address)


def go():
check_python_3()
main()


if __name__ == "__main__":
go() # pragma: no cover
main() # pragma: no cover
8 changes: 1 addition & 7 deletions distributed/cli/dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import click

from distributed.cli.utils import check_python_3
from distributed.deploy.old_ssh import SSHCluster

logger = logging.getLogger("distributed.dask_ssh")
Expand Down Expand Up @@ -223,10 +222,5 @@ def main(
print("[ dask-ssh ]: Remote processes have been terminated. Exiting.")


def go():
check_python_3()
main()


if __name__ == "__main__":
go() # pragma: no cover
main() # pragma: no cover
9 changes: 2 additions & 7 deletions distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dask.system import CPU_COUNT

from distributed import Nanny
from distributed.cli.utils import check_python_3, install_signal_handlers
from distributed.cli.utils import install_signal_handlers
from distributed.comm import get_address_host_port
from distributed.deploy.utils import nprocesses_nthreads
from distributed.preloading import validate_preload_argv
Expand Down Expand Up @@ -486,10 +486,5 @@ async def run():
logger.info("End worker")


def go():
check_python_3()
main()


if __name__ == "__main__":
go() # pragma: no cover
main() # pragma: no cover
46 changes: 0 additions & 46 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,5 @@
import click
from packaging.version import parse as parse_version
from tornado.ioloop import IOLoop

CLICK_VERSION = parse_version(click.__version__)

py3_err_msg = """
Warning: Your terminal does not set locales.
If you use unicode text inputs for command line options then this may cause
undesired behavior. This is rare.
If you don't use unicode characters in command line options then you can safely
ignore this message. This is the common case.
You can support unicode inputs by specifying encoding environment variables,
though exact solutions may depend on your system:
$ export LC_ALL=C.UTF-8
$ export LANG=C.UTF-8
For more information see: http://click.pocoo.org/5/python3/
""".lstrip()


def check_python_3():
"""Ensures that the environment is good for unicode on Python 3."""
# https://github.com/pallets/click/issues/448#issuecomment-246029304
import click.core

# TODO: Remove use of internal click functions
if CLICK_VERSION < parse_version("8.0.0"):
click.core._verify_python3_env = lambda: None
else:
click.core._verify_python_env = lambda: None

try:
from click import _unicodefun

if CLICK_VERSION < parse_version("8.0.0"):
_unicodefun._verify_python3_env()
else:
_unicodefun._verify_python_env()
except (TypeError, RuntimeError):
import click

click.echo(py3_err_msg, err=True)


def install_signal_handlers(loop=None, cleanup=None):
"""
Expand Down
27 changes: 22 additions & 5 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@

_current_client = ContextVar("_current_client", default=None)

DEFAULT_EXTENSIONS = [PubSubClientExtension]
DEFAULT_EXTENSIONS = {
"pubsub": PubSubClientExtension,
}

# Placeholder used in the get_dataset function(s)
NO_DEFAULT_PLACEHOLDER = "_no_default_"

Expand Down Expand Up @@ -928,8 +931,9 @@ def __init__(
server=self,
)

for ext in extensions:
ext(self)
self.extensions = {
name: extension(self) for name, extension in extensions.items()
}

preload = dask.config.get("distributed.client.preload")
preload_argv = dask.config.get("distributed.client.preload-argv")
Expand Down Expand Up @@ -3946,8 +3950,8 @@ async def _dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE,
format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options,
):
filename = str(filename)
Expand Down Expand Up @@ -4057,6 +4061,19 @@ def get_worker_logs(self, n=None, workers=None, nanny=False):
"""
return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)

def benchmark_hardware(self) -> dict:
"""
Run a benchmark on the workers for memory, disk, and network bandwidths
Returns
-------
result: dict
A dictionary mapping the names "disk", "memory", and "network" to
dictionaries mapping sizes to bandwidths. These bandwidths are
averaged over many workers running computations across the cluster.
"""
return self.sync(self.scheduler.benchmark_hardware)

def log_event(self, topic, msg):
"""Log an event under a given topic
Expand Down
5 changes: 4 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)


def _tuple_to_list(node):
if isinstance(node, (list, tuple)):
Expand All @@ -27,7 +30,7 @@ def _tuple_to_list(node):
async def write_state(
get_state: Callable[[], Awaitable[Any]],
url: str,
format: Literal["msgpack", "yaml"],
format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
) -> None:
"Await a cluster dump, then serialize and write it to a path"
Expand Down
18 changes: 16 additions & 2 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from __future__ import annotations

import logging
import platform
import sys

logging_names: dict[str | int, int | str] = {}
logging_names.update(logging._levelToName) # type: ignore
logging_names.update(logging._nameToLevel) # type: ignore

PYPY = platform.python_implementation().lower() == "pypy"
LINUX = sys.platform == "linux"
MACOS = sys.platform == "darwin"
WINDOWS = sys.platform.startswith("win")
Expand Down Expand Up @@ -37,3 +35,19 @@ async def to_thread(func, /, *args, **kwargs):
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)


if sys.version_info >= (3, 9):
from random import randbytes
else:
try:
import numpy

def randbytes(size):
return numpy.random.randint(255, size=size, dtype="u8").tobytes()

except ImportError:
import secrets

def randbytes(size):
return secrets.token_bytes(size)
Loading

0 comments on commit 6d6a0ee

Please sign in to comment.