-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance tuning for weighted 2d histograms with xhistogram #9
Comments
This is a very helpful data point @jbusecke, thank you! I'm sad but excited that turning off spilling helped. I'm probably more wrong than right about this, but it may also have something to do with dask/distributed#4424. Basically, while workers are spilling or un-spilling data from disk, they're unresponsive to any other events—including transferring data to other workers, responding to the scheduler, starting on other tasks, etc. I wrote up a comment on that issue with some hypothesis of how that might affect the system. Very curious to see what happens on the production workflows! |
Update: Now that I've looked more carefully at the notebook, I don't think the In [1]: import dask
In [2]: import distributed
In [3]: client = distributed.Client("tcp://192.168.0.39:8786")
In [4]: client.run(lambda: dask.config.get("distributed.worker.memory.spill"))
'Out[4]: {'tcp://192.168.0.39:54908': 0.7}
In [5]: with dask.config.set({"distributed.worker.memory.spill": None}):
...: print(client.run(lambda: dask.config.get("distributed.worker.memory.spill")))
...:
{'tcp://192.168.0.39:54908': 0.7} In the case of this particular config setting, because it's referenced right when the worker is created, you'd need to either set the Nonetheless, something there made that last run go much better, but I'm even more mystified what it could be. I'm currently trying to rerun your notebook on Pangeo staging, both with and without the |
Thanks for trying all this out @jbusecke. Unfortunately I don't think the code in the notebook actually disables spilling. Setting To test that out you can def get_config_value(key):
import dask
return dask.config.get(key)
print(get_config_value("distributed.worker.memory.spill")) inside the notebook to get the current print(client.run(get_config_value, "distributed.worker.memory.spill"))
print(client.run_on_scheduler(get_config_value, "distributed.worker.memory.spill")) to do the same for the workers and scheduler ( |
Whoops, looks like @gjoseph92 beat me to it. FWIW when I ran through the notebook I got very similar performance with and without spilling -- both looked like the performance_report_full @jbusecke posted |
Thank you for looking into this, and the additional info. That confuses me even more then. I definitely had some major issues with the full calculation, but it seems that is not a persistent issue? FWIW, these problems seem to be somewhat reflected still in my production workflow (and indeed the workers are still spilling!). The production workflow adds a few masking operations (separating ocean basins, making sure that weights have the same missing values as args before passing to histogram), which usually add a bunch more tasks. Or do you have any other ideas what I can do to clarify this problem? |
If you have a PS: I found this repo from xgcm/xhistogram#49, and assume it's OK to participate since it is public. |
Oh that is actually an excellent idea. Ill try that out! Appreciate the help @dcherian! |
@dcherian: This works really nicely! Saves me a lot of files and fuss. @jrbourbeau @gjoseph92 : I am still trying to disable spilling. I have followed these instructions to set environment variables. This is what I am doing: from dask_gateway import Gateway
gateway = Gateway()
# set env variables
options = gateway.cluster_options()
env = {
# "DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": False,
# "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 5,
"DASK_DISTRIBUTED__WORKER__MEMORY__SPILL":None,
}
options.environment = env
cluster = gateway.new_cluster(options)
# scale up the cluster
# cluster.scale(10)
cluster.adapt(minimum=2, maximum=10) # or to a fixed size.
cluster
client = cluster.get_client()
client I then follow @jrbourbeau instructions to check if that has effect: # Proper way to check config values (https://github.com/ocean-transport/coiled_collaboration/issues/9#issuecomment-829765791)
def get_config_value(key):
import dask
return dask.config.get(key)
print(client.run(get_config_value, "distributed.worker.memory.spill"))
print(client.run_on_scheduler(get_config_value, "distributed.worker.memory.spill")) I am getting When I run my computation I still see spilling (orange tasks). Am I getting this wrong still? |
Would some of this be easier if we switched to Coiled dask clusters? The CMIP6 data are all on AWS now, so the choice of cloud is not a problem. |
Would be happy to check that out. What would be needed from me to get an account? |
Try using the string
Yes, if you're seeing orange in the task stream (
Potentially. But because Pangeo offers Last, here's a really awful hack you could try to forcibly turn spilling off once the cluster has already started. If using the string import distributed
def patch_spilling_off(dask_worker: distributed.Worker):
assert len(dask_worker.data) == 0, "Can only patch spilling when no data is stored on the worker"
dask_worker.data = dict()
dask_worker.memory_spill_fraction = None
client.run(patch_spilling_off) I tried running with this on Pangeo, and it would run great (no idle time on the task stream)... until I'd get a |
I just tried
But the workers are still spilling to disk?!? I have just pushed the newest version of my testing notebook, with all the steps I added...
Even with the spilling it seems that the memory usage just keeps growing, but I dont get why. This whole operation should be very well parlellizable. Is there some way to see if things are not getting released? At least some chunks are actually stored. Maybe this is better handled fresh on Monday haha. |
You can go to https://cloud.coiled.io to sign up and https://docs.coiled.io for our documentation. I'm also happy to hop on a call at some point if you'd like a guided tour.
Today Dask workers can't distinguish between writing a task result to disk from it taking some non-negligible amount of time to put the result into memory (see here for where we determine when an orange rectangle should appear in the task stream). So, while usually orange in the task stream corresponds with spilling to disk, this isn't always the case. I'm getting the sense that it might be worth us jumping on a call. I suspect a few minutes of us looking through things together will help save a lot of back-and-forth. @jbusecke are there some times next week that work well for you? |
That would be awesome. Much appreciate the offer. I could do as early as Monday 11AM EST. Would that work for you by any chance?
I just singed up and will take a look around, but might take you up on that offer some time in the future.
Got it, so I might have misinterpreted this. Either way I will put in another hour or so playing around. Ill post results, but just so I can find them for the meeting! |
That sounds great -- just sent you / Gabe a calendar invite. Happy to invite others too, just let me know if you'd like to be included
👍 |
Cool. Excited to see you on Monday. Enjoy the weekend! |
@jrbourbeau, @jbusecke, and I met this morning—very productive to be able to go back and forth in real time! We came away suspicious of a graph structure issue, since we observed that even after thousands of array-creation tasks had run, no store tasks had run (and none were ready to run). We think there is some sort artificial bottleneck which is forcing many (all?) array creations to complete before any stores run. Visualizing the high-level graph, we noticed an odd fan-out-fan-in structure (you wouldn't normally see this in a high-level graph), with many parallel |
Thank you both for the super informative meeting @gjoseph92 @jrbourbeau. I have commited a slightly cleaned up notebook, which can be used as a rough 'diff' to see what we saw. |
Hey @jbusecke, some weird news: I just can't reproduce the issue. I'm running this notebook (your version, added a missing
The computations are running smoothly with 20 workers (40 cores, 171.80 G memory), both writing to a real zarr and with our I also happened to run it locally on my mac (8 cores, 32GiB memory) against Here are performance reports and screengrabs of the dashboard for each of these runs on Pangeo staging:
The main thing to note is that in all of these, the |
Oh damn. The only small thing I believe I changed, was the vertical chunk size |
What was |
Sorry for that oversight, I just kinda changed it while cleaning up. I will concurrently test on staging and production. |
Ok, I'm testing with that on staging now |
Ok, this is really weird. It works for me on staging (with and without z chunks) AND on production (only tested with z chunks). |
This brings me back to @stb2145 issue in #8. If I am correct there was also some sort of 'non-reproducible' element to that, no? |
I gotta pack up over here, but Ill get back to this tomorrow. My proposed steps:
I am still puzzled by this...the task graph still shows the weird fanning in/out behavior, but store/lambda tasks are advancing as expected. Oh wait, I just got it to mess up again, It was working nicely, and then just stopped storing....wow, this is just super weird. |
@jbusecke are you sure you had the same number of workers every time? In that dashboard above it looks like there are 5 workers. I'd been trying to wait to run things until all 20 were available. I think ideally it should still be able to work with 5, just a lot slower. But maybe that could explain the "magic" variation? |
Ah good catch. I think you are right and the number was lower. |
I would say that, in general, this is a huge pain point for dask users. How many workers does a given computation need? And how much memory per worker? Are there any guidelines? Mostly we find out through trial and error (and burn a lot of compute in the process). In traditional HPC we think a lot about strong / weak scaling. Good strong scaling means that, as the number of workers increases, the runtime decreases proportionally. For dask workflows like this, however, it seems that things just grind to a halt if we don't have enough workers. Is it worth looking systematically at strong / weak scaling for some problems? We did a little bit in this paper, but not with workflows of this complexity. |
Some preliminary tests with xgcm/xhistogram#49, seem to indicate that it will solve my issue here. I will close for now, and reopen if necessary. Thanks for all the help @gjoseph92 @jrbourbeau |
Following @gjoseph92 suggestion, I am opening a separate issue here, since the joint distribution problem seems to be sufficiently distinct from #8.
I have revised my notebook slightly, now storing the output to a zarr store in the pangeo scratch bucket instead of loading it to memory.
I ran two 'experiments':
The shorter run finishes fine (I see a lot of comms, but it seemed that this is unavoidable due to the nature of the problem)
The longer run looks ok in the task graph, but the memory does fill up substantially and leads to some orange disk-spilling and thing slow down a lot. The computation sometimes finishes but it takes forever.
I do not understand why this would happen? The only abstract way of making sense of this is that for a given time slice, the operations that work on one variable have to 'wait' for the other variables to become available?
To troubleshoot I ran with
dask.config.set({"distributed.worker.memory.spill": None})
, and that seems to magically fix the problem! 🙀performance_report_nospill
I have to admit I do not understand why this is happening, but ill take this!
I will see if that speeds up my production workflow, and report back!
The text was updated successfully, but these errors were encountered: