Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RELEASE] dask-cuda v0.18 #535

Merged
merged 40 commits into from
Feb 24, 2021
Merged

[RELEASE] dask-cuda v0.18 #535

merged 40 commits into from
Feb 24, 2021

Conversation

GPUtester
Copy link
Contributor

❄️ Code freeze for branch-0.18 and v0.18 release

What does this mean?

Only critical/hotfix level issues should be merged into branch-0.18 until release (merging of this PR).

What is the purpose of this PR?

  • Update documentation
  • Allow testing for the new release
  • Enable a means to merge branch-0.18 into main for the release

ajschmidt8 and others added 30 commits November 24, 2020 15:46
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
While working on remote clusters like in Dask Cloudprovider or Dask Kubernetes it became apparent that you may end up wasting the local GPU.

For instance if I launch a GPU cluster on AWS from a GPU session in Sagemaker the Sagemaker GPU is not part of the cluster.

```python
from dask_cloudprovider import EC2Cluster
from dask.distributed import Client

cluster = EC2Cluster(**gpu_kwargs)
client = Client(cluster)  # Cluster with remote GPUs
```

This PR makes it possible to include the local GPU in the cluster in the same way you would connect a client.

```python
from dask_cloudprovider import EC2Cluster
from dask.distributed import Client
from dask_cuda import CUDAWorker

cluster = EC2Cluster(**gpu_kwargs)
local_worker = CUDAWorker(cluster)
client = Client(cluster)  # Cluster with remote GPUs and local GPU
```

Authors:
  - Jacob Tomlinson <[email protected]>
  - Jacob Tomlinson <[email protected]>

Approvers:
  - Peter Andreas Entschev

URL: #428
[gpuCI] Auto-merge branch-0.17 to branch-0.18 [skip ci]
This PR clean up the style check output by capturing both stdout and stderr. 
Also `isort` line-length is now 88, which is what `black` and `flake8` use.

Authors:
  - Mads R. B. Kristensen <[email protected]>

Approvers:
  - AJ Schmidt
  - Benjamin Zaitlen

URL: #477
Fixes #472

Authors:
  - Peter Andreas Entschev <[email protected]>

Approvers:
  - Mads R. B. Kristensen

URL: #485
This PR introduces a new _device host file_ that uses `ProxyObejct` to implement spilling of individual CUDA objects as opposed to the current host file, which spills entire keys. 

- [x] Implement spilling of individual objects
- [x] Handle task level aliasing
- [x] Handle shared device buffers
- [x] Write docs

To use, set `DASK_JIT_UNSPILL=True`

## Motivation  

### Aliases at the task level 

Consider the following two tasks:
```python

def task1():  # Create list of dataframes
    df1 = cudf.DataFrame({"a": range(10)})
    df2 = cudf.DataFrame({"a": range(10)})
    return [df1, df2]

def task2(dfs):  # Get the second item
    return dfs[1]    
```
Running the two task on a worker we get something like:
```python

>>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
    "k1": [df1, df2],
    "k2": df2,
}
```
Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: `sizeof(df)*3`. But even worse, if it decides to spill `k2` no device memory is freed since `k1` still holds a reference to `df2`!

The new spilling implementation fixes this issue by wrapping identical CUDA objects in a shared `ProxyObejct` thus in this case `df2` in both `k1` and `k2` will refer to the same `ProxyObejct`.


### Sharing device buffers

Consider the following code snippet:
```python

>>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1]
```
In this case `v1` and `v2` are separate objects and are handled separately both in the current and the new spilling implementation. However, the `shuffle_group()` in cudf actually returns a single device memory buffer such that `v1` and `v2` points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect.
The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are.

cc. @beckernick, @VibhuJawa
xref: dask/distributed#3756

Authors:
  - Mads R. B. Kristensen <[email protected]>

Approvers:
  - Peter Andreas Entschev

URL: #451
Fixes #486

Authors:
  - Peter Andreas Entschev <[email protected]>

Approvers:
  - Benjamin Zaitlen (@quasiben)

URL: #487
Authors:
  - Peter Andreas Entschev <[email protected]>

Approvers:
  - Benjamin Zaitlen (@quasiben)
  - @jakirkham

URL: #488
This is useful for PTDS benchmarking.

Authors:
  - Peter Andreas Entschev <[email protected]>

Approvers:
  - @jakirkham

URL: #489
This PR adds a `wait` after submitting dataframe operations in explicit-comms. Hopefully this fixes the deadlock issue #431.
I have re-enabled all the explicit-comms tests and I haven't been able to reproduce the deadlock after 9 CI runs and many local runs.

Having said that, I am not totally convinced that the issue is fixed. The underlying issue is properly some kind of race condition that is hard to trigger so please let me know if any of you encounter a deadlock in explicit-comms!

Authors:
  - Mads R. B. Kristensen <[email protected]>

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #490
Fixes #491 by moving dispatch of cudf objects within a `register_lazy("cudf")`.

Authors:
  - Mads R. B. Kristensen <[email protected]>

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #492
Implements a shuffle benchmark for both for explicit-comms and regular Dask

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - AJ Schmidt (@ajschmidt8)
  - Peter Andreas Entschev (@pentschev)

URL: #496
Some minor cleanup and optimization of `proxy_object.py`

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #497
This PR makes sure that hostfiles register the removal of an existing key. They didn't do that before when mixing CUDA and non-CUDA objects.

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #500
Fixes a deadlock where multiple threads accesses `ProxifyHostFile.maybe_evict()` but none of them can acquire both the `ProxifyHostFile` lock and the `ProxyObject` lock simultaneously. 

Should fix_ rapidsai/gpu-bdb#162 (comment)

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #501
Fixes rapidsai/gpu-bdb#174 by ignoring errors when initializing fixed attributes. Also adds tests of fixed attributes.


#### Rationale
When initialing a `ProxyObject` we probe the *proxied object* in order to retain a copy of the result of accessing fixed attributes like `name` and `__len__()`. Exceptions trigger by this probing is acceptable and can be safely ignored.

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #503
# Summary
This PR removes the conditional check to start Conda uploads in our [upload script](https://github.com/rapidsai/dask-cuda/blob/branch-0.17/ci/cpu/upload.sh#L39-L44)

Authors:
  - Joseph (@jolorunyomi)

Approvers:
  - Mads R. B. Kristensen (@madsbk)
  - Peter Andreas Entschev (@pentschev)
  - Ray Douglass (@raydouglass)

URL: #504
Prepare Changelog for Automation [skip-ci]
This PR fixes the current CI error by using `SimpleNamespace()` when testing the proxy object

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)

URL: #511
madsbk and others added 8 commits February 5, 2021 08:31
This PR re-implement dataframe shuffle completely.

- Fully compatible with Dask (drop-in replacement)
- Can be enabled universally in Dask by setting `DASK_EXPLICIT_COMMS=True` (as an environment variable or Dask config), which makes **all** task shuffles in Dask use explicit-comms seamlessly.
- Reduced the memory use significantly compared to the current explicit-comms shuffle. When combined with JIT-unspill, the memory usage can be reduced to the same level as regular Dask shuffle. I will submit a dedicated PR for this feature.
- Eliminate almost all of the scheduling overhead. We only need to schedule a task per dataframe partition.
- Seeing more than **50% speedup** compared to the regular shuffle in Dask.


#### Benchmark
On a DGX-2 (EXP01) 16  nodes `sf1000/parquet_2gb/query_21` best of three runs: 
```
# UCX
DASK_EXPLICIT_COMMS=False python tpcx_bb_query_21.py
13.229151725769043,main

DASK_EXPLICIT_COMMS=True python tpcx_bb_query_21.py
9.6979820728302,main

# TCP
DASK_EXPLICIT_COMMS=False python tpcx_bb_query_21.py
20.834798336029053,main

DASK_EXPLICIT_COMMS=True python tpcx_bb_query_21.py
13.24032473564148,main
```

cc. @beckernick @VibhuJawa

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Peter Andreas Entschev (@pentschev)
  - Richard (Rick) Zamora (@rjzamora)

URL: #494
This moves a block from the Jenkins build into the build script. Simplifies the Jenkins builds slightly and fixes a very rare case where tests may error and not be properly caught due to EXITCODE intricacies.

Authors:
  - Dillon Cullinan (@dillon-cullinan)

Approvers:
  - Ray Douglass (@raydouglass)
  - AJ Schmidt (@ajschmidt8)
  - Mads R. B. Kristensen (@madsbk)

URL: #450
This PR makes `dataframe_merge.py` standalone. The plan is to remove it completely but I would like to keep it around for benchmarking a little while longer. 

Also: 
- `dataframe_merge.py` and `dataframe_shuffle.py` are moved into a new sub-package `explicit-comms/dataframe` so that dataframe specific utility functions can go in there as well. 
- `explicit-comms/__init__.py` now doesn't import any sub-modules implicitly. 
- Function docs
- Type hints

Authors:
  - Mads R. B. Kristensen (@madsbk)

Approvers:
  - Richard (Rick) Zamora (@rjzamora)

URL: #515
The way the code was written would pass the client's device to each worker and synchronize on that, which is wrong. This was catched due to recent changes to Distributed, where the following was raised:

```python
Task exception was never retrieved
future: <Task finished coro=<Scheduler.broadcast.<locals>.send_message() done, defined at /datasets/pentschev/miniconda3/envs/gdf/lib/python3.7/site-packages/distributed/scheduler.py:4965> exception=Exception("unhashable type: 'cupy.cuda.device.Device'")>
Traceback (most recent call last):
  File "/datasets/pentschev/miniconda3/envs/gdf/lib/python3.7/site-packages/distributed/scheduler.py", line 4969, in send_message
    resp = await send_recv(comm, close=True, serializers=serializers, **msg)
  File "/datasets/pentschev/miniconda3/envs/gdf/lib/python3.7/site-packages/distributed/core.py", line 662, in send_recv
    raise Exception(response["text"])
Exception: unhashable type: 'cupy.cuda.device.Device'
```

Authors:
  - Peter Andreas Entschev (@pentschev)

Approvers:
  - Benjamin Zaitlen (@quasiben)

URL: #518
This PR adds the following operations to the local CuPy array benchmark:

- sum
- mean
- array slicing

This also adds an additional special argument, `--benchmark-json`, which takes an optional path to dump the results of the benchmark in JSON format. This would allow us to generate plots using the output, as discussed in #517.

Some thoughts:

- Should there be an additional argument to specify the array slicing interval (which is currently fixed at 3)?
- Could the JSON output be cleaned up? Currently, a (truncated) sample output file looks like:

```json
{
  "operation": "transpose_sum",
  "size": 10000,
  "second_size": 1000,
  "chunk_size": 2500,
  "compute_size": [
    10000,
    10000
  ],
  "compute_chunk_size": [
    2500,
    2500
  ],
  "ignore_size": "1.05 MB",
  "protocol": "tcp",
  "devs": "0,1,2,3",
  "threads_per_worker": 1,
  "times": [
    {
      "wall_clock": 1.4910394318867475,
      "npartitions": 16
    }
  ],
  "bandwidths": {
    "(00,01)": {
      "25%": "136.34 MB/s",
      "50%": "156.67 MB/s",
      "75%": "163.32 MB/s",
      "total_nbytes": "150.00 MB"
    }
  }
}
```

Authors:
  - Charles Blackmon-Luca (@charlesbluca)

Approvers:
  - Mads R. B. Kristensen (@madsbk)
  - Peter Andreas Entschev (@pentschev)

URL: #524
Issues and PRs without activity for 30d will be marked as stale.
If there is no activity for 90d, they will be marked as rotten.

Authors:
  - Jordan Jacobelli (@Ethyling)

Approvers:
  - Mike Wendt (@mike-wendt)

URL: #528
Follows #528 

Updates the stale GHA with the following changes:

- [x] Uses `inactive-30d` and `inactive-90d` labels instead of `stale` and `rotten`
- [x] Updates comments to reflect changes in labels
- [x] Exempts the following labels from being marked `inactive-30d` or `inactive-90d`
  - `0 - Blocked`
  - `0 - Backlog`
  - `good first issue`

Authors:
  - Mike Wendt (@mike-wendt)

Approvers:
  - Ray Douglass (@raydouglass)

URL: #531
This PR adds the GitHub action [PR Labeler](https://github.com/actions/labeler) to auto-label PRs based on their content. 

Labeling is managed with a configuration file `.github/labeler.yml` using the following [options](https://github.com/actions/labeler#usage)

Authors:
  - Joseph (@jolorunyomi)
  - Mike Wendt (@mike-wendt)

Approvers:
  - AJ Schmidt (@ajschmidt8)
  - Mike Wendt (@mike-wendt)

URL: #480
@GPUtester GPUtester requested review from a team as code owners February 17, 2021 20:43
charlesbluca and others added 2 commits February 19, 2021 10:44
…ng (#533)

This PR ensures CUDA device synchronization for local CuPy benchmarks when profiling with Dask performance reports.

Prior to this, the run times of benchmarks running over UCX would vary drastically depending on if a performance report was generated or not:

Additionally, I suppressed flake8 rule E402 in docs/source/conf.py, as that had been causing the pre-commit hooks to fail up until now. I'm okay with removing that change if it would be better suited for a separate PR.

Authors:
  - @charlesbluca

Approvers:
  - @jakirkham
  - @pentschev
  - @raydouglass

URL: #533
@raydouglass raydouglass merged commit 603b58d into main Feb 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.