-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Question] A new approach to memory spilling #4568
Comments
I think having this machine optionally available would be a nice improvement. @eric-czech do many or any sgkit workloads rely on dask spilling ? In the past, @rabernat described an example where spilling might be impactful -- though the bulk of the conversation is about scheduler decisions/improvements |
Yes I think this would be great to have in Dask. I recall at one point Matt may have mentioned interest in this being upstreamed (though I could be misremembering) |
Hm no in the sense that we prefer to do performance testing w/ spilling disabled but yes in the sense that we still want it to work for our users who may not know how to tune chunk sizes to avoid memory problems. We're still figuring out the latter as well. Does this spilling typically occur when usage would exceed GPU memory or when it would exceed RAM? I can imagine that spilling may become more critical for us if it is necessary to keep workflows constrained by GPU memory << RAM from failing. cc: @tomwhite @ravwojdyla |
@alimanfoo recently wrote up some thoughts on changing memory storage/spilling . @madsbk do you think a pluggable backend would be possible with the proxy object you are proposing here ? |
Yes, this should 100% compatible with a pluggable backend approach. Users will be able to define what should happen when spilling between layers. BTW, the spilling issues @alimanfoo are reporting remains me of the spilling issues we were having in GPU-BDB. That was the motivation for implementing the new spilling approach :) |
I think that we should talk about the problems that we're trying to solve, and then assess different solutions to those problems. This seems like one possible solution, possibly out of a few. This approach seems interesting, but also somewhat magical/complex (although possibly that is only because I don't know much about it yet). I think that we should maybe have a longer discussion first before adopting this particular path. @madsbk I suspect the answer is no, but do you happen to have anything that you can share where different options here were discussed that shows pros/cons, or should we start that conversation fresh? |
My thoughts by the way are that we should break the MutableMapping abstraction and start explicitly including things like Disk storage in the worker. I'm also fine including device memory explicitly in the worker. Part of this is described here: #4424 I expect it to be more effective and more simple long term than proxy objects. |
Right, breaking the In order to support spilling/un-spilling of objects already passed to a task, I don't see a way around proxy objects. |
I think the only real disadvantage is the proxy objects. I cannot think of a more simple solution that addresses all of the mentioned issues. But if the proxy objects are too much of an issue, I can look at fixing the double counting issue by breaking the |
Yeah, I think that when a task starts running we wouldn't try to spill it to disk. I'm not sure I understand this point. More generally, I'm not sure exactly what is being proposed here. Can you say more about what a proxy object is, how it is used, and how it affects users? I get the sense that this is a topic that is already well understood to the RAPIDS team, but may not be well understood more generally. Does this mean that we would change the object that is passed to user functions? If so, this seems like it might open up a lot of unexpected issues. |
This may also provide other benefits, like allowing for async reading/writing, and improved worker scheduling of tasks that is sensitive to data that is already in fast memory, or pre-fetching data from slow memory. |
Yes, relevant objects are wrapped in a proxy object before passed to user functions. class ProxyObject:
"""Object wrapper/proxy for serializable objects
This is used by ProxifyHostFile to delay deserialization of returned objects.
Objects proxied by an instance of this class will be JIT-deserialized when
accessed. The instance behaves as the proxied object and can be accessed/used
just like the proxied object.
ProxyObject has some limitations and doesn't mimic the proxied object perfectly.
Thus, if encountering problems remember that it is always possible to use unproxy()
to access the proxied object directly or disable JIT deserialization completely
with `jit_unspill=False`.
Type checking using instance() works as expected but direct type checking
doesn't:
>>> import numpy as np
>>> from dask_cuda.proxy_object import asproxy
>>> x = np.arange(3)
>>> isinstance(asproxy(x), type(x))
True
>>> type(asproxy(x)) is type(x)
False
""" |
Issues with the current implementation of memory spillingObject duplicationThe same memory object can live inside a task and in the worker's Spilling orderThe order of which we spill objects can have a drastic effect on performance. The current implementation tracks access time of each key in Aliases at the task levelConsider the following two tasks: def task1(): # Create list of dataframes
df1 = cudf.DataFrame({"a": range(10)})
df2 = cudf.DataFrame({"a": range(10)})
return [df1, df2]
def task2(dfs): # Get the second item
return dfs[1] Running the two task on a worker we get something like: >>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
"k1": [df1, df2],
"k2": df2,
} Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: The new spilling approach fixes this issue by wrapping identical objects in a shared Sharing device buffersConsider the following code snippet: >>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1] In this case |
Thank you for the detailed explanation @madsbk. This is quite helpful to motivate the change. I'm curious, in practice does this mostly come up in dataframe shuffle situations when we're low on memory? Are there other common use cases? This would be a very large change in the abstractions used for Dask workers (user data would no longer be unchanged), and the introduction of a substantial amount of magic. Personally, if there are not very many problems that this solves, then I would prefer to try to think of other possible solutions to this problem before going down this path. My concern is that a change like this would negatively impact a number of other workloads in hard-to-understand ways. |
Maybe this is worth discussing at the monthly meeting next week? 😉 |
This spilling workstream has made a huge impact on quite a few workloads, largely during shuffle as @mrocklin noted. GPU-BDB (as @madsbk mentioned) is a clear example, as it comprises 30 independent workloads designed to represent common ETL/Analytics workloads. With the default spilling, in several of the workloads we end up repeatedly spilling the same data, significantly reducing performance and largely eliminating the value-add of spilling itself. Workloads that take 350-400 seconds on a constrained system can finish in 80-100 seconds with this optional spilling configuration enabled. One additional note is that shuffle in general is memory intensive, so we often end up in a "low-memory situation" when we may not have expected one. What perhaps initially feels like a reasonable setup of a 1:1 or 1:2 ratio of data size to total memory can quickly become a low-memory scenario when large datasets need to be merged. Default spilling allows some of these workloads like GPU-BDB to succeed on lower-memory infrastructure but at the cost of a steep performance penalty. In the GPU-BDB example, the penalty of being able to run on lower-memory systems relative to the workload data requirements is simply not worth it. With the optional spilling configuration enabled the calculation changes dramatically. |
I wonder if there are other ways that we can improve the memory performance of shuffle that don't impact other workloads as much. @madsbk you've mentioned before wanting to affect task prioritization. Now that annotations are in maybe that is a way that you can help to improve things while using less magic? That might be a better use of time. Just so we're clear, the reason why I'm pushing back on this is that I have a high degree of confidence that if we do this I'll get many complaints from different groups that their types have changed, and that some strange thing that they were doing that was type dependent no longer works. For example, my understanding is that we would be breaking any user code that looked like this: def f(x):
typ = type(x)
if typ is int:
...
else:
... Is my understanding correct? If so then I'm a solid -1 on this change. I would be surprised if others here wouldn't also be -1 on this. This seems like a pretty clear violation of the "Dask just runs your Python code" contract. The comment of "well users can use isinstance instead" assumes that all users will read whatever doc we write about this, which I think we know isn't true. |
Agree, I don't think anyone suggest that this will be enabled by default. The idea is to have it as an option along side regular spilling and no spilling. Users that encounter spilling issues like degraded performance and unexpected out-of-memory issues, can then try this new spilling option. I suspect that user that have significant spilling problems are motivated enough to address the As @beckernick mentioned, the current Dask-CUDA solution works very well and doesn't require any change to Distributed but I figured if we are going to generalize the solution to support spilling to disk it might make sense to do it in Distributed instead of Dask-CUDA. |
I'm fully committed to improving memory performance. I think that there are some general purpose approaches that we can leave on by default and that would help everyone more generally. I'm personally much more excited about these approaches. Some of those include:
The double-counting issue I don't have a solution to. This seems hard to do in general. My guess is that there are situations that even the ProxyObject solution here doesn't solve, like views of numpy arrays or objects in more nested data structures. This particular solution seems useful (as you all have clearly demonstrated), but also narrow enough that it might be better maintained outside of the main codebase. |
If this is very effective for shuffle workloads then maybe it's something that we could implement just for that code path? That might be a tightly scoped place to try this out more broadly. |
Support of the [new built-in spilling in cuDF](rapidsai/cudf#12106) so that `device_memory_limit` and `memory_limit` ignores cuDF's device buffers. This is only implemented for `DeviceHostFile`. Since jit-unspill also targets cuDF and libraries such as cupy isn't supported, I don't think it is important to support cuDF's built-in spilling in `ProxifyHostFile`. For now, `DeviceHostFile` simply ignores cuDF's device buffers and let cuDF handle the spilling. This means that `DeviceHostFile` might estimate the device and host memory usage incorrectly ([or more incorrectly than usually](dask/distributed#4568 (comment))). Authors: - Mads R. B. Kristensen (https://github.com/madsbk) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: #984
Question: would the Dask/Distributed community be interested in an improved memory spilling model that fixes the shortcomings of the current one but make use of proxy object wrappers?
In Dask-CUDA we have introduced a new approach to memory spilling that handles object aliasing and JIT memory un-spilling: rapidsai/dask-cuda#451
The result is memory spilling that:
The current implement in Dask-CUDA handles CUDA device objects but it is possible to generalize to also handle spilling to disk.
The disadvantage of this approach is the use of proxy objects that get exposed to the users. The inputs to a tasks might be wrapped in a proxy object, which doesn't mimic the proxied object perfectly. E.g.:
Because of this, the approach shouldn't be enabled by default but do you think that the Dask community would be interested in a generalization of this approach? Or is the proxy object hurdle too much of an issue?
cc. @mrocklin, @jrbourbeau, @quasiben
The text was updated successfully, but these errors were encountered: