-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous Disk Access in Workers #4424
Asynchronous Disk Access in Workers #4424
Comments
Is it a possibility to use a bigger/different interface than What I would also hope to address is the potential data duplication caused by spilling #3756 if we are reworking this area. I never followed up on that topic but fixing it would also require breaking up that abstraction to some extend. |
Yes, we could do something different. We could also just take Zict and
make AsyncZict or something that followed the same logic, but where all
methods were async friendly.
…On Wed, Jan 13, 2021 at 8:44 AM Florian Jetter ***@***.***> wrote:
Is it a possibility to use a bigger/different interface than
MutableMapping? I'm not too worried about "not using zict" any longer but
I am concerned about the monolithic worker problem you mentioned and I
think we should keep some level of abstraction for this, let alone for
easier testing once the logic continues to grow with the out-of-memory
topic you mentioned.
What I would also hope to address is the potential data duplication caused
by spilling #3756 <#3756> if we
are reworking this area. I never followed up on that topic but fixing it
would also require breaking up that abstraction to some extend.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#4424 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTF2S34NKHNCWYA4AYTSZXEYXANCNFSM4WBBPMWA>
.
|
cc @madsbk who has also recently been doing work around spilling |
I'm curious about the ripple effects this could be having on performance. See ocean-transport/coiled_collaboration#9, where the same workload is running with and without EDIT: the spill config wasn't actually being applied to workers: ocean-transport/coiled_collaboration#9 (comment). So whatever's causing the difference is unrelated. However, I think the observations of blocking IO might still be relevant? The interesting thing to me is that with spilling on, the task stream has lots of blank space in it—workers are "idle" (not doing user work) a lot of the time. With spilling off, work is fully packed, and there's no idle time. I think ideally, spilling should not cause more idle time in the task stream. The red transfer times might be longer, and we'd see more yellow I notice a pattern looking at the task stream in the spilling case. You can kind of see white/"idle" patches on one worker that match up with data-transfer waits on another worker. This might just be a Rorschach interpretation. But it seems to indicate that the sender of the data is blocked from running other tasks until the data it's sending has been un-spilled from disk. (This makes sense, since if the event loop is blocked, how could new tasks be submitted to run?) (@jrbourbeau: here's another case where the "inverted task stream" / "what is my working doing when it's not working" visualization we were talking about could be useful) I'm skeptical that disk IO could be taking 40-60sec in some of these cases (getting that number from the corresponding transfer durations). That seems really slow. But, in the administrative worker profile, we can see that ~1200 seconds are spent in Basically it seems that transferring spilled data locks up the sender of the data for part of the time. Blocking the worker IO loop to me seems consequential. Based on my incomplete understanding of the system—please correct what I'm getting wrong here—I think this could be the fallout (assuming IO is blocking for ~seconds at a time):
Semi-related thing I noticed: in |
@gjoseph92 distributed/distributed/worker.py Lines 1366 to 1380 in 233ec88
That's the time spent loading the data before the sender submits it. That's 100% disk IO. Similarly, there is the distributed/distributed/worker.py Lines 1383 to 1397 in 233ec88
That should at least help understanding where the time is spent on the sender. In particular if no task was queued for execution before this is initiated, the worker will idle until this is over, that's correct. |
I've thought about the suggest of @gjoseph92 w.r.t a streaming like interface and I believe this aligns pretty well with the target of avoiding data duplication. I tried to put my idea about how this interface could look like into some code and would be glad to receive feedback. To be entirely honest, this proposal would not allow for direct from-disk-streaming but I think this could be easily added https://gist.github.com/fjetter/25fae963c70c9b756b591213244af96a This proposal keeps the mutable mapping and still allows for fast The interface itself would be write async and whether or not the |
cc'ing @madsbk who has worked on spilling over in |
@gjoseph92 I think you are right in your observations. I made some similar ones, which turned in to the JIT-unspilling approach for GPU -> CPU memory spilling: rapidsai/dask-cuda#451. Notice, this approach delegates the de-serialization to the worker-thread executing the task. Also it handles the de-serializing & re-serializing issue in ... and a small commercial, I will be talking about this at the Dask Distributed Summit 2021 :) |
I think there is a significant overlap. If I am not mistaken, the interface @fjetter propose is very similar to the |
There are at least similar ideas. The execution is quite different but it is also not a fair comparison since my code is not actually working :)
Considering that the goal of this issue is to allow for async data access, this would require us to change the interface for the worker anyhow and I would consider un-proxification a hard requirement for this feature since generic user functions should not be required to deal with our internal abstraction. I could not 100% follow the implementation of the cuda proxies but it looks reasonably unspecific to GPUs which is good. From what I understand, most of the complexity is introduced since the implementation deduplicates by I'm wondering how much of the "how do we want to spill" discussion should/must leak into the "what is a good asyncio interface" discussion. |
I will put forward three options here for asyncio interfaces. I'm not recommending these interfaces in particular; instead my objective is to highlight a choice we have. AsyncMutableMappingWe could make an async extension of the MutableMapping Interface with However, it still keeps the keys in memory / disk / gpu memory opaque. This is good because it's a separate concern and extensible, but also somewhat bad because we can't use this informaiton for scheduling decisions or diagnostics. No abstraction at allAlternativley we could just fully bake in ideas for memory / disk / gpu memory into Dask Workers. After several years there have only been two serious implementations of this abstraction, the default memory/disk layout and dask-cuda's DeviceHostDisk layout. Let's just absorb these two options into the This closes the door for easy addition of future layers to the memory hierarchy (object stores? compressed-in-memory? other accelerators?) but opens the door for smarter scheduling. In particular there are often workloads where we should be computing on data that we have close at hand in main memory first, but instead we read in data from disk, and compute on that data while shoving the easy-to-access in-memory data to disk, only to have to read it later. Storage Layer abstraction (middle ground)Maybe in the middle, we create an abstraction for how to asynchronously get and set data into some storage system, and the Worker has a stack of these things with associated size limitations. class File(StorageLayer):
async def getitem(self, key):
...
async def setitem(self, key, value):
...
async def delitem(self, key):
... The stack-of-layers/zict.Buffer/zict.LRU logic is hard-coded into the Worker while the logic of each storage-layer is abstracted into a separate class. |
Hrm, or perhaps that's too simple. I now realize that the |
As another option (or an extension of the Storage Layer abstraction), could we define an interface for storage layers to give us enough information to make such smarter scheduling decisions? What are the questions this would need to answer? It sounds like we want to know "which keys are fast to load". Maybe also "how much memory will this key take if I load it", "give me the ranking of (these keys|all keys) in 'fastness'"? |
Perhaps it would make sense to just start with adding async methods to the MutableMappings in zict so that we can avoid blocking the event loop. This is probably an easy(ish) change and would relieve short term pressure. |
It would also avoid us having to come to some grand design here |
Thought about this again in terms of what the minimal interface needs to be to unblock the event loop. Might be very obvious but I stumbled over this so I'll put this down
I'm still in favour of keeping the abstraction for testing purposes and cleaner code but I don't care too much about where the implementation lives. We do have our own SpillBuffer class by now anyhow |
Recent workloads I've run have made me think streaming spilled bytes straight from disk to other workers (transferring spilled data without either deserializing it, or loading all the bytes into memory) needs to be a core part of this design. I made a simple workload that resulted in spilling lots of data to disk (basically Notice there are lots of disk reads, but few transfers, little white space—task stream looks great Compare this to an I’d just never seen spilling work well, like it did in the first example. Which made me realize that maybe the biggest problems aren't so much with spilling, but with how spilling interacts with data transfer. So I think it's essential that we make that a core part of any redesign. |
The biggest problem I am aware of, and have prove for, connecting spill+transfer is #3756 where up to 40% of the memory was allocated by untracked data copies, that's not including (de-)serialization overhead but only managed memory. There have been suggestions to solve this by using weakrefs which would not require any big redesign and may already solve a significant part of this problem. If RSS blows up uncontrolled, this can cause all sorts of side effects and even cause a vicious cycle (high RSS -> consistent LRU eviction -> more spill to disk -> more data duplication)
I would be very interested to talk about instrumentation to confirm this. As I said, the data duplication problem above is amplified by the LRU logic, particularly with the random access pattern of a shuffle. What kind of instrumentation would be helpful here?
It's obvious that not doing the (de-)serialization in |
Once the Active Memory Manager Rebalance comes into play later on, there's going to be an extra burden of data that is fully deserialised upon arrival, but it's actually just "parked" on the worker and may not be needed in its deserialised form for a long time. I think it would be fairly straightforward to have an interface where you can deserialise data only when a task actually needs it. I suspect garbage collection times will improve drastically too when data isn't primarily made of large numpy arrays. |
This discussion feels OT for the ticket. Moving it to #5900. |
FWIW my gut feeling here is to not invent an abstraction and just put memory/disk/gpu memory directly on the worker. I think that an abstraction will be hard to create here, and that it will get in the way. I think that things will be simpler if we have state and methods like class Worker:
self.memory_storage = {}
self.disk_storage = zict.File(...)
self.device_storage = {}
def get_data_from_storage(key: str) -> object:
...
def put_data_into_storage(key: str) -> object:
... And then we can start to use this broken out state incrementally throughout the worker logic. This would be common in scheduling (we strongly prefer tasks for which the data is already in RAM) and as Gabe points out, in communication (when sending data-on-disk to other machines there's no reason to deserialize it). These additions could be done after the fact rather than all at once. |
I very much agree with Matt. I don't think we should abstract away where the data is stored, since in different contexts we want to use different logic depending on the storage location. Eventually an abstraction could be appropriate, but right now we don't know what to use, so we should just not use one yet. This is what I'm trying to say in #5891 (comment). |
When Gabe and I agree, you know we're on to something 🙂 |
I'm curious what the reasons are against modelling spilled keys with a new state in the state machine. It's a bit more code, but more explicit and therefore easier to maintain, I imagine. Un-spilling feels quite analogous to fetching data, which has been nice to model this way. I think we'd all agree that the worker state machine is great and generally a huge boon to stability. Putting more things in it seems good. It would be a bigger diff, since we'd no longer use the layering of zict mappings to explicitly spill and un-spill stuff. I imagine we'd get benefits that aren't easy with an implicit spilling design, since the worker could generally make more explicit scheduling decisions around spilling and memory. For example, we could explicitly decide when to un-spill keys prior to running a task, in parallel with other tasks executing—just like we decide when to fetch data, in parallel with execution. This would pipeline disk IO and execution #7643. We could return "busy" for a get-data request if un-spilling the data would put us over the memory limit. We could let a task with all inputs in memory run ahead of a task that still needs to un-spill, but it higher priority. I don't know if there are any huge new features this would enable in the short term; I mostly think of it for maintainability, stability, and debug-ability, and because using the state machine for coordinating these asynchronous operations has just been so dang nice. |
Couple of thoughts about the above two suggestions async SpillBuffer (Guidos approach)I'm not entirely convinced that we'll need an async set or update. In this architecture, the Spilled keys as worker states (Gabes approach)I imagine this would be more difficult to pull off backwards compatible (thinking of the RAPIDS folks) but should be doable.
I don't think there are many new features this would enable right away but I agree that this would grant us much more control. I also see how this could reduce complexity in some cases. One example is the weakref caching we put in to avoid data duplication In an offline conversation @crusaderky and I discussed the possibility of using state machine events to optimistically unspill things (assuming there is an async buffer), i.e. the two approaches could be unified for maximum complexity :) The suggestion of using state machine events for this sounds appealing. I could see a multitude of optimizations being implemented without a hierarchical spill buffer (e.g. use parquet for arrow tables, do not spill regardless of size if we know that it will be used immediately afterwards, etc.) I wonder what @crusaderky thinks of this |
@madsbk pinging you again for visibility. To understand @gjoseph92 s proposal you'll likely need a bit more insight into how the worker functions nowadays. There is some documentation https://distributed.dask.org/en/stable/worker-state.html (The suggestion is to use an |
Sorry for the misunderstanding - yes, that's exactly what I proposed.
I don't think it is beneficial either. Since
I don't see why such an optimization shouldn't be encapsulated in the SpillBuffer? You just described a Sieve: slow = zict.Sieve(
mappings={
"file": zict.Func(dumps, loads, zict.File(local_directory / "file")),
"parquet": zict.Parquet(local_directory / "parquet"),
},
selector=lambda k, v: "parquet" if isinstance(v, pd.DataFrame) else "file",
)
I've been thinking about something similar recently, specifically about preventing a key from being spilled if it's currently being used by execute or get_data. I think it could be straightforwardly be achieved via hints to the SpillBuffer. |
Delivery timeline
New public APIdask/distributed will indefinitely continue to support passing a generic This is the new API, downstream of #7686 + #7421: @runtime_checkable
class AsyncBufferProto(Protocol, Collection[str]):
"""Duck-type API that a third-party alternative to SpillBuffer must respect if it
wishes to support spilling.
Notes
-----
All methods must return (almost) immediately.
``__setitem__`` and ``set_offset`` *may* trigger asynchronous spilling activity,
which however this API is opaque to.
``async_get`` must raise KeyError if and only if a key is not in the collection at
some point during the call. ``__setitem__`` immediately followed by ``async_get``
*never* raises KeyError; ``__delitem__ `` immediately followed by ``async_get``
*always* raises KeyError. Likewise, ``__contains__`` and ``__len__`` must
immediately reflect the changes wrought by ``__setitem__`` / ``__delitem__``.
This is public API.
"""
def __setitem__(self, key: str, value: object) -> None:
...
def __delitem__(self, key: str) -> None:
...
def async_get(
self, keys: Collection[str], missing: Literal["raise", "omit"] = "raise"
) -> Awaitable[dict[str, object]]:
"""Fetch zero or more key/value pairs.
Parameters
----------
keys:
Zero or more keys
missing: raise | omit, optional
raise
If any key is missing, raise KeyError
omit
Omit missing keys from the returned dict
"""
... # pragma: nocover
def memory_total(self) -> int:
"""(estimated) bytes currently held in fast memory.
Does not include offset or spilled bytes.
"""
... # pragma: nocover
def set_offset(self, n: int) -> None:
"""Change the offset to be added to memory_total in order to determine if
key/value pairs should be internally spilled. It may be negative. This may
internally trigger or cancel spilling activity but, like in ``__setitem__``,
this is opaque to this API.
""" CC @madsbk is this feasible to implement on your side? |
Yes, it looks feasible. For dask-cuda's JIT-unspill, this is almost trivial. Dask-cuda's default spilling requires a bit more work but definitely feasible. |
Status update: dask/zict#92 is currently undergoing peer review. The good news:The change works exactly as intended. The event loop is not blocked anymore while spilling/unspilling is in effect. The looks-bad-but-it's-actually-good news:In all use cases where there was spilling and unspilling caused by a lot of independent tasks, this change will cause workers to reach the paused state a lot faster - effectively resulting in higher memory consumption. The reason is that, in main, once you pass the We may need to tweak the pause threshold down a bit - maybe 5% -or people will start experiencing OOM restarts whereas before they weren't, simply because their heap (across all threads) is more than 15% total memory. The bad news:This change in behaviour uncovered a very, very ugly bug, which completely destroys performance: #7742. The second piece of bad news is that it turns out that the current coiled-runtime test bench is not very impacted by a choked event loop. The two tests in test_spill are not:
There's only one test, which features fast-running and a lot of memory generation: test_spilling [uncompressible]
[1] hack in the test code, as shown in the last video in #7742, which waits for workers to unpause The test has the same end-to-end runtime in the three use cases; this is unsurprising as it's dominated by raw disk throughput. test_dot_product_spill [uncompressible]
Again, note how other has dropped to zero; there is also a substantial drop in unknown time, which is where workers are waiting on network transfers from other workers or the scheduler. test_set_indexI've failed to reproduce this test in a jupyter notebook so far - which I need to extract the metrics. |
How come? Did you fail to reproduce the runtime difference or are you not able to run the test? |
For these tests I'd be interested to see the raw disk IO rates. Did the AsyncBuffer achieve higher peak rates or how does this look like? If you could share the grafana link, that'd be helpful (the benchmark stuff should be publicly available in grafana) |
I assume this refers to |
I've copied the test verbatim (including the call to dask.config.set to choose the shuffle engine) but it completes in ~20 seconds, without ever getting even close to spilling, instead of ~200s with heavy spilling. Could not figure out what I'm doing wrong.
Yes |
They're roughly the same. In the new version, disk write and task execution are pipelined, whereas before they were interleaved; however in |
I could only confirm a runtime decrease of about 33%. Still nice but a little less impressive than what your measurements are showing. I can also only confirm this speedup if we are truly under very heavy memory pressure in the case of memory_multiplier=1 and persist=True, i.e. this is a very unhealthy place for the cluster to be in. Without persisting data I can even see the opposite effect that the async spilling implementation performs worse (haven't collected the data) |
This issue has been postponed indefinitely following disappointing A/B test results. |
Currently reading from or writing to disk blocks the event loop
distributed/distributed/worker.py
Lines 1948 to 1964 in 74e8dc6
This can cause workers to become unresponsive, especially in systems with very slow disk access. Ideally Disk I/O would happen concurrently. There are a couple of ways to do this.
Offload to separate thread
We could move all manipulation of the
Worker.data
MutableMapping to a separate thread, such as we do with theoffload
function, which we use today for deserialization.However, if we do this then we need to do it for all access to
Worker.data
including seemingly innocuous checks likeif key in self.data
which may become annoying.Handle Disk logic directly in the worker
We could also break apart the
MutableMapping
abstraction, and unpack the zict logic directly into the Worker code. This would allow us to keep a lot of the fast access in the event loop, while treating disk access specially. It would also open the door for more performance improvements, like trying to schedule tasks for data that is currently in memory rather than data that is currently on disk. In general if we want to improve out-of-memory handling in Dask we'll eventually need to break this abstraction.However, breaking this abstraction comes at considerable cost. First, it means that there is more to manage in a monolithic Worker codebase (zict has tricky logic that we haven't really had to touch or maintain in years). Second, it means that we'll have to find a way that still lets other groups like RAPIDS extend the storage hierarchy (they have device->host->disk rather than just host->disk).
cc @quasiben @pentschev @jrbourbeau @fjetter
The text was updated successfully, but these errors were encountered: