Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] A new approach to memory spilling #4568

Open
madsbk opened this issue Mar 8, 2021 · 20 comments
Open

[Question] A new approach to memory spilling #4568

madsbk opened this issue Mar 8, 2021 · 20 comments

Comments

@madsbk
Copy link
Contributor

madsbk commented Mar 8, 2021

Question: would the Dask/Distributed community be interested in an improved memory spilling model that fixes the shortcomings of the current one but make use of proxy object wrappers?

In Dask-CUDA we have introduced a new approach to memory spilling that handles object aliasing and JIT memory un-spilling: rapidsai/dask-cuda#451

The result is memory spilling that:

The current implement in Dask-CUDA handles CUDA device objects but it is possible to generalize to also handle spilling to disk.

The disadvantage of this approach is the use of proxy objects that get exposed to the users. The inputs to a tasks might be wrapped in a proxy object, which doesn't mimic the proxied object perfectly. E.g.:

    # Type checking using instance() works as expected but direct type checking doesn't:
    >>> import numpy as np
    >>> from dask_cuda.proxy_object import asproxy
    >>> x = np.arange(3)
    >>> isinstance(asproxy(x), type(x))
    True
    >>>  type(asproxy(x)) is type(x)
    False

Because of this, the approach shouldn't be enabled by default but do you think that the Dask community would be interested in a generalization of this approach? Or is the proxy object hurdle too much of an issue?

cc. @mrocklin, @jrbourbeau, @quasiben

@quasiben
Copy link
Member

I think having this machine optionally available would be a nice improvement. @eric-czech do many or any sgkit workloads rely on dask spilling ?

In the past, @rabernat described an example where spilling might be impactful -- though the bulk of the conversation is about scheduler decisions/improvements

@jakirkham
Copy link
Member

Yes I think this would be great to have in Dask. I recall at one point Matt may have mentioned interest in this being upstreamed (though I could be misremembering)

@eric-czech
Copy link

@eric-czech do many or any sgkit workloads rely on dask spilling ?

Hm no in the sense that we prefer to do performance testing w/ spilling disabled but yes in the sense that we still want it to work for our users who may not know how to tune chunk sizes to avoid memory problems. We're still figuring out the latter as well.

Does this spilling typically occur when usage would exceed GPU memory or when it would exceed RAM? I can imagine that spilling may become more critical for us if it is necessary to keep workflows constrained by GPU memory << RAM from failing.

cc: @tomwhite @ravwojdyla

@quasiben
Copy link
Member

@alimanfoo recently wrote up some thoughts on changing memory storage/spilling . @madsbk do you think a pluggable backend would be possible with the proxy object you are proposing here ?

@madsbk
Copy link
Contributor Author

madsbk commented Mar 22, 2021

@alimanfoo recently wrote up some thoughts on changing memory storage/spilling. @madsbk do you think a pluggable backend would be possible with the proxy object you are proposing here ?

Yes, this should 100% compatible with a pluggable backend approach. Users will be able to define what should happen when spilling between layers.

BTW, the spilling issues @alimanfoo are reporting remains me of the spilling issues we were having in GPU-BDB. That was the motivation for implementing the new spilling approach :)

@mrocklin
Copy link
Member

I think that we should talk about the problems that we're trying to solve, and then assess different solutions to those problems. This seems like one possible solution, possibly out of a few.

This approach seems interesting, but also somewhat magical/complex (although possibly that is only because I don't know much about it yet). I think that we should maybe have a longer discussion first before adopting this particular path.

@madsbk I suspect the answer is no, but do you happen to have anything that you can share where different options here were discussed that shows pros/cons, or should we start that conversation fresh?

@mrocklin
Copy link
Member

mrocklin commented Mar 22, 2021

My thoughts by the way are that we should break the MutableMapping abstraction and start explicitly including things like Disk storage in the worker. I'm also fine including device memory explicitly in the worker. Part of this is described here: #4424 I expect it to be more effective and more simple long term than proxy objects.

@madsbk
Copy link
Contributor Author

madsbk commented Mar 23, 2021

Right, breaking the MutableMapping abstraction will make it fairly easy to fix the double counting issue (#4186) and avoid memory spikes because of incorrect memory tally. However, in order to avoid spilling of the same object multiple times, I think you need some kind of proxy object but maybe it is possible to avoid exposing the proxy objects to the task/user.

In order to support spilling/un-spilling of objects already passed to a task, I don't see a way around proxy objects.

@madsbk
Copy link
Contributor Author

madsbk commented Mar 23, 2021

@madsbk I suspect the answer is no, but do you happen to have anything that you can share where different options here were discussed that shows pros/cons, or should we start that conversation fresh?

I think the only real disadvantage is the proxy objects. I cannot think of a more simple solution that addresses all of the mentioned issues. But if the proxy objects are too much of an issue, I can look at fixing the double counting issue by breaking the MutableMapping abstraction.

@mrocklin
Copy link
Member

In order to support spilling/un-spilling of objects already passed to a task, I don't see a way around proxy objects.

Yeah, I think that when a task starts running we wouldn't try to spill it to disk. I'm not sure I understand this point.

More generally, I'm not sure exactly what is being proposed here. Can you say more about what a proxy object is, how it is used, and how it affects users? I get the sense that this is a topic that is already well understood to the RAPIDS team, but may not be well understood more generally.

Does this mean that we would change the object that is passed to user functions? If so, this seems like it might open up a lot of unexpected issues.

@mrocklin
Copy link
Member

Right, breaking the MutableMapping abstraction will make it fairly easy to fix the double counting issue (#4186) and avoid memory spikes because of incorrect memory tally

This may also provide other benefits, like allowing for async reading/writing, and improved worker scheduling of tasks that is sensitive to data that is already in fast memory, or pre-fetching data from slow memory.

@madsbk
Copy link
Contributor Author

madsbk commented Mar 23, 2021

Does this mean that we would change the object that is passed to user functions? If so, this seems like it might open up a lot of unexpected issues.

Yes, relevant objects are wrapped in a proxy object before passed to user functions.

class ProxyObject:
    """Object wrapper/proxy for serializable objects

    This is used by ProxifyHostFile to delay deserialization of returned objects.

    Objects proxied by an instance of this class will be JIT-deserialized when
    accessed. The instance behaves as the proxied object and can be accessed/used
    just like the proxied object.

    ProxyObject has some limitations and doesn't mimic the proxied object perfectly.
    Thus, if encountering problems remember that it is always possible to use unproxy()
    to access the proxied object directly or disable JIT deserialization completely
    with `jit_unspill=False`.

    Type checking using instance() works as expected but direct type checking
    doesn't:
    >>> import numpy as np
    >>> from dask_cuda.proxy_object import asproxy
    >>> x = np.arange(3)
    >>> isinstance(asproxy(x), type(x))
    True
    >>>  type(asproxy(x)) is type(x)
    False
    """

@madsbk
Copy link
Contributor Author

madsbk commented Mar 23, 2021

Issues with the current implementation of memory spilling

Object duplication

The same memory object can live inside a task and in the worker's self.data simultaneously. If the worker decides to spill an object x in its self.data, it will not free any memory if the task holds a reference to x. Even worse, if the output of the task includes x, the worker's self.data will now contain two copies of x, a spilled and an un-spilled copy.

Spilling order

The order of which we spill objects can have a drastic effect on performance. The current implementation tracks access time of each key in self.data and not access time of objects in self.data. This means that a getitem(v, 42) task on a collection v will effectively update the access time of all items in v and not only the 42th item.

Aliases at the task level

Consider the following two tasks:

def task1():  # Create list of dataframes
    df1 = cudf.DataFrame({"a": range(10)})
    df2 = cudf.DataFrame({"a": range(10)})
    return [df1, df2]

def task2(dfs):  # Get the second item
    return dfs[1]    

Running the two task on a worker we get something like:

>>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
    "k1": [df1, df2],
    "k2": df2,
}

Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: sizeof(df)*3. But even worse, if it decides to spill k2 no device memory is freed since k1 still holds a reference to df2!

The new spilling approach fixes this issue by wrapping identical objects in a shared ProxyObejct thus in this case df2 in both k1 and k2 will refer to the same ProxyObejct.

Sharing device buffers

Consider the following code snippet:

>>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1]

In this case v1 and v2 are separate objects and are handled separately both in the current and the new spilling implementation. However, the shuffle_group() in cudf actually returns a single device memory buffer such that v1 and v2 points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect.
The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are.

@mrocklin
Copy link
Member

Thank you for the detailed explanation @madsbk. This is quite helpful to motivate the change. I'm curious, in practice does this mostly come up in dataframe shuffle situations when we're low on memory? Are there other common use cases?

This would be a very large change in the abstractions used for Dask workers (user data would no longer be unchanged), and the introduction of a substantial amount of magic. Personally, if there are not very many problems that this solves, then I would prefer to try to think of other possible solutions to this problem before going down this path. My concern is that a change like this would negatively impact a number of other workloads in hard-to-understand ways.

@jakirkham
Copy link
Member

Maybe this is worth discussing at the monthly meeting next week? 😉

@beckernick
Copy link
Member

This spilling workstream has made a huge impact on quite a few workloads, largely during shuffle as @mrocklin noted. GPU-BDB (as @madsbk mentioned) is a clear example, as it comprises 30 independent workloads designed to represent common ETL/Analytics workloads. With the default spilling, in several of the workloads we end up repeatedly spilling the same data, significantly reducing performance and largely eliminating the value-add of spilling itself. Workloads that take 350-400 seconds on a constrained system can finish in 80-100 seconds with this optional spilling configuration enabled.

One additional note is that shuffle in general is memory intensive, so we often end up in a "low-memory situation" when we may not have expected one. What perhaps initially feels like a reasonable setup of a 1:1 or 1:2 ratio of data size to total memory can quickly become a low-memory scenario when large datasets need to be merged.

Default spilling allows some of these workloads like GPU-BDB to succeed on lower-memory infrastructure but at the cost of a steep performance penalty. In the GPU-BDB example, the penalty of being able to run on lower-memory systems relative to the workload data requirements is simply not worth it. With the optional spilling configuration enabled the calculation changes dramatically.

@mrocklin
Copy link
Member

I wonder if there are other ways that we can improve the memory performance of shuffle that don't impact other workloads as much. @madsbk you've mentioned before wanting to affect task prioritization. Now that annotations are in maybe that is a way that you can help to improve things while using less magic? That might be a better use of time.

Just so we're clear, the reason why I'm pushing back on this is that I have a high degree of confidence that if we do this I'll get many complaints from different groups that their types have changed, and that some strange thing that they were doing that was type dependent no longer works. For example, my understanding is that we would be breaking any user code that looked like this:

def f(x):
    typ = type(x)
    if typ is int:
        ...
    else:
        ...

Is my understanding correct? If so then I'm a solid -1 on this change. I would be surprised if others here wouldn't also be -1 on this. This seems like a pretty clear violation of the "Dask just runs your Python code" contract. The comment of "well users can use isinstance instead" assumes that all users will read whatever doc we write about this, which I think we know isn't true.

@madsbk
Copy link
Contributor Author

madsbk commented Mar 24, 2021

Agree, I don't think anyone suggest that this will be enabled by default. The idea is to have it as an option along side regular spilling and no spilling. Users that encounter spilling issues like degraded performance and unexpected out-of-memory issues, can then try this new spilling option. I suspect that user that have significant spilling problems are motivated enough to address the typ is int issue.

As @beckernick mentioned, the current Dask-CUDA solution works very well and doesn't require any change to Distributed but I figured if we are going to generalize the solution to support spilling to disk it might make sense to do it in Distributed instead of Dask-CUDA.

@mrocklin
Copy link
Member

I'm fully committed to improving memory performance. I think that there are some general purpose approaches that we can leave on by default and that would help everyone more generally. I'm personally much more excited about these approaches. Some of those include:

  1. Defining prioritization for shuffle
  2. Having the worker think explicitly about host/device/disk
  3. ???

The double-counting issue I don't have a solution to. This seems hard to do in general. My guess is that there are situations that even the ProxyObject solution here doesn't solve, like views of numpy arrays or objects in more nested data structures. This particular solution seems useful (as you all have clearly demonstrated), but also narrow enough that it might be better maintained outside of the main codebase.

@mrocklin
Copy link
Member

If this is very effective for shuffle workloads then maybe it's something that we could implement just for that code path? That might be a tightly scoped place to try this out more broadly.

rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this issue Nov 22, 2022
Support of the [new built-in spilling in cuDF](rapidsai/cudf#12106) so that `device_memory_limit` and `memory_limit` ignores cuDF's device buffers.

This is only implemented for `DeviceHostFile`. Since jit-unspill also targets cuDF and libraries such as cupy isn't supported, I don't think it is important to support cuDF's built-in spilling in `ProxifyHostFile`.

For now, `DeviceHostFile` simply ignores cuDF's device buffers and let cuDF handle the spilling. This means that `DeviceHostFile` might estimate the device and host memory usage incorrectly ([or more incorrectly than usually](dask/distributed#4568 (comment))).

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #984
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants