Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design and prototype for root-ish task deprioritization by withholding tasks on the scheduler #6560

Closed
fjetter opened this issue Jun 10, 2022 · 10 comments · Fixed by #6614 · May be fixed by #6598
Closed

Design and prototype for root-ish task deprioritization by withholding tasks on the scheduler #6560

fjetter opened this issue Jun 10, 2022 · 10 comments · Fixed by #6614 · May be fixed by #6598
Assignees
Labels
discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better memory performance stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@fjetter
Copy link
Member

fjetter commented Jun 10, 2022

Root task overproduction is a significant problem for memory pressure and runtime

#6360 has shown that we can find a way to deal with the problem isolated in the scheduler (e.g. w/out full STA) by limiting the number of tasks assigned to the worker at a given time

Implementing this logic might have significant impact on our scheduling policies and we want to review the design and implementation thoroughly and perform necessary performance benchmark before committing to it

Expectations

  • A draft PR is created that implements a version of the in Ease memory pressure by deprioritizing root tasks? #6360 proposed algorithm that does not preserve root-task co-assignment
  • (Best effort) A draft PR is created that implements a version of the in Ease memory pressure by deprioritizing root tasks? #6360 proposed algorithm that does preserve root-task co-assignment
  • The algorithm should be able to run on large graphs
  • Implementation can be tested by OSS users (e.g. pangeo)
  • There is consensus about how to withhold tasks on the scheduler by Friday June 24th, 2022
  • PRs are only merged iff there are thorough performance benchmarks available confirming this works for non-root-task cases as well, e.g. traditional shuffle, map overlap, other-evil-graph-problem

Out of scope / Follow up

  • Thorough performance benchmarks exist (as part of coiled-runtime)
@fjetter fjetter added enhancement Improve existing functionality or make things work better performance stability Issue or feature related to cluster stability (e.g. deadlock) memory labels Jun 10, 2022
@gjoseph92 gjoseph92 self-assigned this Jun 10, 2022
@crusaderky

This comment was marked as outdated.

@gjoseph92
Copy link
Collaborator

We now have draft PRs for:

Obviously, ignoring co-assignment makes the implementation much simpler.

The key question is going to be, how important is co-assignment? I imagine we'll want it eventually. But perhaps fixing root task overproduction is such a big win that workloads that currently only succeed thanks to co-assignment will still succeed under a naive root-task-withholding implementation that doesn't co-assign. (Some example workloads to try in #2602 (comment).)

So I think the next step is to do some basic benchmarking between these two PRs (and main) and see how much of a marginal benefit we get from co-assignment + fixing overproduction vs just fixing overproduction. (Even if #6598 is not the implementation we'd want to merge, it still should work well enough to test this.)

That's what I plan to do today.

@gjoseph92
Copy link
Collaborator

If it turns out co-assignment isn't that important, then we can just focus on #6614, and possibly merging it soon under a feature flag (there are still a couple bugs to fix).

If it turns out co-assignment is still critical for some workloads, or offers a huge performance/memory gain, then that means that we probably have to accept some complexity if we are going to withhold tasks at all.

Aside: the question of "maybe we shouldn't withhold tasks then, maybe we should just do STA" has come up. IMO we're probably going to want root task withholding even with STA. Or at least, I don't think we have nearly enough understanding of what the STA heuristics would be to confidently say we won't need root task withholding with STA. One big question with STA will be how to steal/rebalance tasks when workers join/leave. Stealing is already problematic enough on just runnable tasks (#6600). The idea of having nearly every task pre-assigned to a worker and needing rebalance them all while maintaining consistency seems daunting. A great way to limit the complexity of rebalancing under STA would be to withhold root tasks, which brings the number of tasks workers actually know about and need to be rebalanced down from O(n_tasks) to O(n_threads), which is probably a 10-100x reduction.

So that's one reason why I don't think withholding root tasks (as an overall approach) is necessarily a stopgap we'd throw out during a major scheduling overhaul like STA.

Therefore, if benchmarking shows that co-assignment is critical, then we should think about where we'd be willing to accept the complexity (in a task-batch-identifying algorithm, or in a load-rebalancing algorithm).

If it isn't critical, then we should get task withholding merged without it, and think about co-assignment as another incremental change sometime in the future.

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Jun 24, 2022

Benchmarking results

I ran #6614 and #6598 on a few representative workloads.

I don't think co-assignment has enough of a memory benefit for it to be an immediate requirement.

Adding co-assignment to root-task withholding seems to somewhat reduce runtimes, but confusingly, tends to somewhat increase memory usage. (That's such an odd result, it makes a a little skeptical of my implementation of #6598, but I've tested it and it seems to be correct?)

Across the board, withholding root tasks improves memory usage, often significantly (5x-100x), though occasionally it's the same. It sometimes decreases runtime, and sometimes increases it, by significant factors (up to 2x) both ways.

Bottom line: fixing root task overproduction seems to make workloads feasible that are not feasible today (without absurd memory requirements). Removing co-assignment does not seem to make anything infeasible that works today—just maybe slower.


anom-mean

anom-mean-combined

import xarray as xr
import dask.array as da
from dask.utils import format_bytes
import numpy as np
import sneks

# From https://github.com/dask/distributed/issues/2602#issuecomment-498718651

from run_trial import run

if __name__ == "__main__":
    data = da.random.random((10000, 1000000), chunks=(1, 1000000))
    arr = xr.DataArray(
        data, dims=["time", "x"], coords={"day": ("time", np.arange(10000) % 100)}
    )
    print(
        f"{format_bytes(data.nbytes)} - {data.npartitions} {format_bytes(data.blocks[0, 0].nbytes)} chunks"
    )
    # 74.51 GiB - 10000 7.63 MiB chunks

    clim = arr.groupby("day").mean(dim="time")
    anom = arr.groupby("day") - clim
    anom_mean = anom.mean(dim="time")

    client = sneks.get_client(
        name="anom-mean",
        n_workers=20,
        worker_vm_types=["r5a.large"],  # 2 CPU, 16GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(anom_mean, __file__)

basic-sum

basic-sum-combined

import dask.array as da
from dask.utils import format_bytes
import sneks

# From https://github.com/dask/distributed/pull/4864

from run_trial import run

if __name__ == "__main__":
    data = da.zeros((12500000, 5000), chunks=(12500000, 1))
    print(
        f"{format_bytes(data.nbytes)} - {data.npartitions} {format_bytes(data.blocks[0, 0].nbytes)} chunks"
    )
    # 465.66 GiB - 5000 95.37 MiB chunks

    result = da.sum(data, axis=1)

    client = sneks.get_client(
        name="basic-sum",
        n_workers=4,
        worker_vm_types=["t3.xlarge"],  # 4 CPU, 16GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(result, __file__)

climactic-mean

climactic-mean-combined

import xarray as xr
import dask.array as da
from dask.utils import format_bytes
import numpy as np
import pandas as pd
import sneks

# https://github.com/dask/distributed/issues/2602#issuecomment-535009454

from run_trial import run

if __name__ == "__main__":
    size = (28, 237, 96, 21, 90, 144)
    chunks = (1, 1, 96, 21, 90, 144)
    arr = da.random.random(size, chunks=chunks)
    print(
        f"{format_bytes(arr.nbytes)} - {arr.npartitions} {format_bytes(arr.blocks[(0,) * arr.ndim].nbytes)} chunks"
    )
    # 1.26 TiB - 6636 199.34 MiB chunks

    items = dict(
        ensemble=np.arange(size[0]),
        init_date=pd.date_range(start="1960", periods=size[1]),
        lat=np.arange(size[2]).astype(float),
        lead_time=np.arange(size[3]),
        level=np.arange(size[4]).astype(float),
        lon=np.arange(size[5]).astype(float),
    )
    dims, coords = zip(*list(items.items()))

    array = xr.DataArray(arr, coords=coords, dims=dims)
    arr_clim = array.groupby("init_date.month").mean(dim="init_date")

    client = sneks.get_client(
        name="climactic-mean",
        n_workers=20,
        worker_vm_types=["r6i.xlarge"],  # 4 CPU, 32 GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(arr_clim, __file__)

dataframe-align

  • Align a 160GB DataFrame and a 80GB DataFrame (both with known divisions), then subtract them and take the mean.
  • Root task overproduction has a huge effect here.

dataframe-align-combined

import dask
from dask.utils import format_bytes
from dask.sizeof import sizeof
import sneks


from run_trial import run

if __name__ == "__main__":
    df = dask.datasets.timeseries(
        start="2020-01-01",
        end="2024-01-01",
        freq="600ms",
        partition_freq="12h",
        dtypes={i: float for i in range(100)},
    )

    p = df.partitions[0].compute(scheduler="threads")
    partition_size = sizeof(p)
    total_size = partition_size * df.npartitions
    print(
        f"~{len(p) * df.npartitions:,} rows x {len(df.columns)} columns, "
        f"{format_bytes(total_size)} total, "
        f"{df.npartitions:,} {format_bytes(partition_size)} partitions"
    )
    # ~210,384,000 rows x 100 columns, 158.32 GiB total, 2,922 55.48 MiB partitions

    df2 = dask.datasets.timeseries(
        start="2010-01-01",
        end="2012-01-01",
        freq="600ms",
        partition_freq="12h",
        dtypes={i: float for i in range(100)},
    )

    final = (df2 - df).mean()  # will be all NaN, just forcing alignment

    client = sneks.get_client(
        name="dataframe-align",
        n_workers=20,
        worker_vm_types=["t3.xlarge"],  # 4 CPU, 16 GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(final, __file__)

jobqueue

  • Just submitting 10,000 independent CPU-bound tasks (simulating usage of dask for embarrassingly-parallel jobs)
  • Should be a case where root task overproduction helps / doesn't matter, since workers just need to crank through tasks as fast at they can
  • Removing root task overproduction only added a 1% slowdown
  • Notice the y-axis values; all that matters here is runtime
    jobqueue-combined
from dask import delayed
from dask.utils import parse_bytes
import time
import random
import sneks


from run_trial import run

if __name__ == "__main__":

    @delayed(pure=True)
    def task(i: int) -> int:
        stuff = "x" * parse_bytes("400MiB")
        time.sleep(random.uniform(0, 3))
        return i

    tasks = [task(i) for i in range(10_000)]
    result = delayed(sum)(*tasks)  # just so we have a single object

    client = sneks.get_client(
        name="jobqueue",
        n_workers=20,
        worker_vm_types=["c6a.xlarge"],  # 4 CPU, 8GB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(result, __file__)

shuffle

  • Standard task-based shuffle of an 80GB DataFrame on a 320GB cluster.
  • Shuffle is one of the primary cases expected to benefit from root task overproduction, at least runtime-wise (we have to load the entire input dataset anyway, so overproduction avoids any delay in loading it).
  • Overproduction does speed up the shuffle, but not by that much.
  • Interesting how the memory pattern is inverted (as you'd expect): overproduction loads all the data up front, then shuffles it; withholding loads data incrementally, so peak memory use doesn't happen until the very end.
  • Sidenote: even with this large of a cluster, I had a hard time getting shuffles to complete without workers dying or locking up; Worker can memory overflow by fetching too much data at once #6208 (and Worker can memory overflow by fetching too much data at once #6208) seem like significant issues with shuffling right now.

shuffle-combined

import dask
from dask.utils import format_bytes
from dask.sizeof import sizeof
import sneks


from run_trial import run

if __name__ == "__main__":
    df = dask.datasets.timeseries(
        start="2020-01-01",
        end="2024-01-01",
        freq="1200ms",
        partition_freq="24h",
        dtypes={i: float for i in range(100)},
    )

    p = df.partitions[0].compute(scheduler="threads")
    partition_size = sizeof(p)
    total_size = partition_size * df.npartitions
    print(
        f"~{len(p) * df.npartitions:,} rows x {len(df.columns)} columns, "
        f"{format_bytes(total_size)} total, "
        f"{df.npartitions:,} {format_bytes(partition_size)} partitions"
    )
    # ~105,192,000 rows x 100 columns, 79.16 GiB total, 1,461 55.48 MiB partitions

    shuf = df.shuffle(0, shuffle="tasks")
    result = shuf.size

    client = sneks.get_client(
        name="shuffle",
        n_workers=20,
        worker_vm_types=["t3.xlarge"],  # 4 CPU, 16 GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
        wait_for_workers=True,
    )
    print(client.dashboard_link)

    run(result, __file__, timeout=20 * 60)

vorticity

vorticity-combined

import dask.array as da
from dask.utils import format_bytes
import sneks

# From https://github.com/dask/distributed/issues/6571

from run_trial import run

if __name__ == "__main__":
    u = da.random.random((5000, 5000, 6000), chunks=(5000, 5000, 1))
    v = da.random.random((5000, 5000, 6000), chunks=(5000, 5000, 1))

    print(
        f"Each input: {format_bytes(u.nbytes)} - {u.npartitions} {format_bytes(u.blocks[0, 0, 0].nbytes)} chunks"
    )
    # Each input: 1.09 TiB - 6000 190.73 MiB chunks

    dx = da.random.random((5001, 5000), chunks=(5001, 5000))
    dy = da.random.random((5001, 5000), chunks=(5001, 5000))

    def pad_rechunk(arr):
        """
        Pad a single element onto the end of arr, then merge the 1-element long chunk created back in.

        This operation complicates each chain of the graph enough so that the scheduler no longer recognises the overall computation as blockwise,
        but doesn't actually change the overall topology of the graph, or the number of chunks along any dimension of the array.

        This is motivated by the padding operation we do in xGCM, see

        https://xgcm.readthedocs.io/en/latest/grid_ufuncs.html#automatically-applying-boundary-conditions
        https://github.com/xgcm/xgcm/blob/fe860f96bbaa7293142254f48663d71fb97a4f36/xgcm/grid_ufunc.py#L871
        """

        padded = da.pad(arr, pad_width=[(0, 1), (0, 0), (0, 0)], mode="wrap")
        old_chunks = padded.chunks
        new_chunks = list(old_chunks)
        new_chunks[0] = 5001
        rechunked = da.rechunk(padded, chunks=new_chunks)
        return rechunked

    up = pad_rechunk(u)
    vp = pad_rechunk(v)
    result = dx[..., None] * up - dy[..., None] * vp

    client = sneks.get_client(
        name="vorticity",
        n_workers=20,
        worker_vm_types=["r5.2xlarge"],  # 8 CPU, 64 GiB
        environ={"MALLOC_TRIM_THRESHOLD_": 0},
    )
    print(client.dashboard_link)
    print("Waiting for 20 workers")
    client.wait_for_workers(20)

    run(result, __file__)

@JSKenyon
Copy link

Thanks @gjoseph92 - those are some really interesting results! I definitely think that the improvements in memory footprint are compelling. That said, I would be really interested to see these benchmarks for different (mainly larger) numbers of workers. I haven't looked at any of the implementation in detail, but it would be good to show that these improvements are approximately independent of n_workers. I agree that co-assignment doesn't have a large impact on memory footprint, but it is interesting how much of a difference it can make to run times. Again, this would be really interesting to look at as a function of n_workers. Thanks again to everyone who is working on this problem!

@fjetter fjetter added the discussion Discussing a topic with no specific actions yet label Jun 24, 2022
@fjetter
Copy link
Member Author

fjetter commented Jun 24, 2022

Great benchmark results @gjoseph92 ! I agree with @JSKenyon that we will want a couple more benchmarks to confirm these results and we'll likely need to iterate on the implementation and review it more thorouhgly.

I opened a follow up ticket to pursue the implementation without co-assignment (see ticket for some explanation why)

Instead of doing more manual benchmarking I would rather like us to put this into an automated benchmarks, see (using Coiled and coiled-runtime)

@TomNicholas
Copy link

This looks great so far @gjoseph92 !

For my vorticity example from #6571, is the "something else going on here" related to "widely-shared dependencies" #5325? (I'm not sure if you ran the full problem or a simplified version for that benchmarking case.)

@mrocklin
Copy link
Member

First, exciting results. I'm excited. Also, kudos on the clear presentation of those results.

If it is easy to share performance reports for the shuffling case I would be curious to see them.

@mrocklin
Copy link
Member

mrocklin commented Jul 7, 2022

@gjoseph92 I'm curious, any luck digging into these and seeing what was going on?

@gjoseph92
Copy link
Collaborator

There was a small typo in my code for the shuffle-enrich test case, which made a large difference: the shuffle didn't run, because I was referencing the original DataFrame, not the shuffled one.

So the results showing an enormous decrease in shuffle memory were inaccurate, in that they didn't apply to shuffling. They were accurate in that the workload I accidentally tested (which is also a common workload) did see ~9x lower memory usage without production and shorter runtime, but it was definitely mis-named.

I've edited #6560 (comment) to correct this.

  • I added a pure shuffle workload. Unsurprisingly, this doesn't show any benefit, but also not much detriment.
  • dataframe-align is the workload I had originally mis-named shuffle-enrich. I've renamed it and re-run it to confirm the results still hold (they do).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet enhancement Improve existing functionality or make things work better memory performance stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
6 participants