-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask distributed client memory overruns on Prefect with large time dimensions. #151
Comments
Do you know roughly where in the pipeline things are failing? I assume in |
@TomAugspurger It is difficult to isolate as it appears that there is no failure of the recipe code in the workers but that the memory increase and failure is occurring in the container where the Prefect flow execution is running (where tasks are being submitted to the cluster by the Prefect DaskExecutor). The workers continue processing recipe tasks until the flow execution container is killed. In the example above where I am using a small 5GB limit for the flow execution container, the workers are only able to check a single cached input before the flow execution container is killed and stops submitting jobs to the cluster. |
Ahh, I missed that it's the running the flow that's being killed (rather than a worker). Sorry. There's been some issues in the past around logs filling out memory on the scheduler. Do you know what level logging is set at, and whether anything is being stored in an in-memory list? |
@rabernat Thanks for seeing the connection here, I hadn't reviewed #116 as I was considering this as a purely Prefect issue. My (albeit incomplete) understanding of Prefect's DaskExecutor is that no task graph is constructed for the entire flow but that the individual tasks are submitted to the dask scheduler and graph dependencies are managed purely by Prefect. So in the Prefect case, improving |
I updated the Prefect environment to use |
Thanks for your persistence @sharkinsspatial! Such a frustrating situation. Do the logs give you any information about where in the code we are at when we run out of memory? |
I'm including some more detailed memory instrumentation for the |
@sharkinsspatial let me know if you want to do a video call tomorrow to walk through it. I have some free time before 10:00 central and maybe from 3:00 - 4:00 Central. |
@TomAugspurger Based on our discussion yesterday I ran simplified flow to try and isolate potential points for memory leaks at the worker level and to try to determine if recipe serialization or something within the recipe class may be responsible for increasing memory usage.
Though the rate of memory increase at the worker level is lower, there is still a steady, linear memory increase on workers. Looking at the tops stats from
Looking at the
In reference to dask/distributed#4091 I have decreased profiler frequency using these environment settings on the
But this does not seem to alleviate the issue. |
Interesting, thanks... Do you think it's worth removing fsspec and s3fs from the equation now, and just using requests and boto3 directly? |
Could the memory growth be related to fsspec / s3fs caching? If so, that can be turned off. |
We discussed that briefly yesterday. Looking at the docs, we felt like it should have an upper bound and start purge items from the cache by default, but didn't test out disabling it yet. |
@rabernat I had previously tested using
In order to see if s3fs caching could be playing apart that results were the same. In order to drastically simplify things and potentially isolate this to a Dask or Prefect issue I made another run using
Which showed the same steadily increasing memory usage. To my naive eye, this coupled with the |
@TomAugspurger A discussion of a similar issue from the [Prefect Slack channel] (https://prefect-community.slack.com/archives/CL09KU1K7/p1623226992151500) references this older issue you raised on distributed. |
Picking up the thread on worker memory issues, https://gist.github.com/TomAugspurger/f870d87456695cf47bafd5a8a5ae450e as an incomplete start at a worker plugin to periodically snapshot the types & sizes of all the objects in the worker process's memory. That might shed some light on which objects are sticking around. I don't know if I'll have a chance to get back to that today or tomorrow. |
As noted in #208 there are Prefect specific task serialization issues that are being tracked by Prefect with ongoing work here. Given this issue I have been running recipes directly without Prefect. Testing with a reduced time dimension version of the gpm-imerg recipe I have also encountered potential serialization related issues. I realize I haven't included easily reproducible examples of problematic recipe serialization for large time dimensions. Using the following recipe code
With a
Our 5GB worker limit is quickly overrun prior to any task being executed. I don't have a strong understanding of task serialization in Dask but I'm curious how this recipe which is reported 21.36MB causes such a large memory footprint on both the client and workers. |
#151 (comment) ☝️ cc @alxmrs |
Interestingly, in our CI based workflows we don't have access to the recipe module location at runtime and instead interrogate the meta.yaml to dynamically import the the recipe module.
This appears to cause Dask to use a different serialization strategy. In this case we see a slow memory expansion (led by |
@sharkinsspatial thanks for posting that. Just to clarify, when you run the job do you see the memory spike on the scheduler, workers, or both? And do you have a sense for what the peak memory usage should be? Something like the size of the largest chunk times the number of chunks being merged? |
@TomAugspurger In the first case (with a normal module
Scheduler memory increases rapidly inline with worker memory growth. As for peak memory I would expect that your assumption for peak memory is correct, however this memory growth is almost immediate and is occurring prior to any tasks being executed ( In the second case when obtaining the recipe module via
Memory growth slowly increases on both the client, workers and scheduler until the workers report |
I've narrowed the serialization issue down to just The code below completes without any issues (doesn't do anything other than submit a bunch of no-op tasks to dask) import dataclasses
import functools
import tempfile
import dask.sizeof
import dask.utils
from distributed import Client
from fsspec.implementations.local import LocalFileSystem
import pangeo_forge_recipes.patterns
from imerg_recipe import recipe
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget
# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.FilePattern)
# def sizeof_file_pattern(o: pangeo_forge_recipes.patterns.FilePattern) -> int:
# return (
# dask.sizeof.sizeof(o.format_func) +
# dask.sizeof.sizeof(o.fsspec_open_kwargs) +
# dask.sizeof.sizeof(o.query_string_secrets) +
# dask.sizeof.sizeof(o.is_opendap)
# )
# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.MergeDim)
# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.ConcatDim)
# def sizeof_combinedim(o) -> int:
# return dask.sizeof.sizeof(dataclasses.asdict(o))
fs_local = LocalFileSystem()
cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)
target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)
metadata_dir = tempfile.TemporaryDirectory()
metadata_target = MetadataTarget(fs_local, metadata_dir.name)
recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = metadata_target
obj = recipe.to_dask()
dsk = obj.dask
def simple_func(*args, **kwargs):
return None
def simplify_callable(dsk):
for k, v in dsk.items():
_, *args = v
dsk[k] = (simple_func,) + tuple(args)
def simplify_args(dsk):
simple_types = (int, str, list, pangeo_forge_recipes.patterns.Index)
for k, v in dsk.items():
func, *args = v
new_args = []
n = len(args)
for arg in args:
if isinstance(arg, simple_types):
new_args.append(arg)
elif isinstance(arg, functools.partial):
replace = {"file_pattern"}
# replace = {}
new_kwargs = dict()
for i, (kwarg, val) in enumerate(arg.keywords.items()):
if kwarg in replace:
new_kwargs[kwarg] = i
else:
new_kwargs[kwarg] = val
new_arg = functools.partial(arg.func, **new_kwargs)
new_args.append(new_arg)
else:
raise ValueError
assert len(new_args) == n
dsk[k] = (func,) + tuple(new_args)
if __name__ == "__main__":
print("simplifying")
simplify_callable(dsk)
simplify_args(dsk)
with Client(n_workers=1, threads_per_worker=1, memory_limit="4G", nanny=False) as client:
print("submitting")
client.get(dsk, [obj.key]) If we don't replace the I'll pick this up again this afternoon. |
#217 might have a fix. @sharkinsspatial are you able to test out the full recipe on a branch? |
@TomAugspurger This appears to solve our initial memory overrun (though I can't fully test as the NASA server bounces me with |
@sharkinsspatial as I noted in #217 (comment), my "fix" might be off. I'm not sure anymore. I do have something that I think is a fix on the recipe though. Can you try with this for the import aiohttp
import pandas as pd
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes import XarrayZarrRecipe
# TODO: replace with ENV vars
username = password = "[email protected]"
def make_filename(time):
input_url_pattern = (
"https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/"
"imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5"
).format(
yyyy=time.strftime("%Y"),
mm=time.strftime("%m"),
dd=time.strftime("%d"),
yyyymmdd=time.strftime("%Y%m%d"),
sh=time.strftime("%H"),
sm=time.strftime("%M"),
eh=time.strftime("%H"),
em=(time + pd.Timedelta("29 min")).strftime("%M"),
MMMM=f"{(time.hour*60 + time.minute):04}",
)
return input_url_pattern
dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")
def func(i):
return pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")[i]
class SymbolicKeys:
def __init__(self, func, n):
self.func = func
self.n = n
def __iter__(self):
for i in range(self.n):
yield self.func(i)
def __len__(self):
return self.n
keys = SymbolicKeys(func, len(dates))
time_concat_dim = ConcatDim("time", keys, nitems_per_file=1)
pattern = FilePattern(
make_filename,
time_concat_dim,
fsspec_open_kwargs={"auth": aiohttp.BasicAuth(username, password)},
)
recipe = XarrayZarrRecipe(
pattern,
xarray_open_kwargs={"group": "Grid", "decode_coords": "all"},
inputs_per_chunk=1,
) The difference is that Happy to jump on https://whereby.com/pangeo to chat through this sometime in the next 30 minutes. |
Cool idea. If this solution sticks, we could consider having |
I think a more general solution to this might be to just never ship the |
Yeah, I think that's the right solution if it's doable. Looking now. |
It's not a trivial refactor but should be possible. |
I can try to make that PR. |
That'd be great. I'm getting a bit tripped up how to get And I'm like 90% sure that this SymbolicKeys stuff will fix the real recipe, but maybe we want to wait to hear from @sharkinsspatial before doing another large refactor. |
I do think the SymbolicKeys will fix it. But I think that the SymbolicKeys approach adds complexity, while mine removes it. Let's try both. I'm going to add this to #213, which is already about how we open files. |
@TomAugspurger This
For each of the time indice tasks. |
@sharkinsspatial can you try with class SymbolicKeys:
def __init__(self, func, n):
self.func = func
self.n = n
def __iter__(self):
for i in range(self.n):
yield self.func(i)
def __len__(self):
return self.n
def __getitem__(self, i):
return self.func(i) We need to figure out exactly what features of that According to the type signature, it just needs to be a collection (so |
@TomAugspurger I had actually just tried that and hit
|
Is the error message actually |
@TomAugspurger 👍 Yup. Our |
So in terms of "root"-cause it seems like passing around In [20]: dask.utils.format_bytes(len(dates) * dates.nbytes)
Out[20]: '2.30 GiB' I think that on the client, we have ~17,000 references to the same Ideally we're able to extract the relevant (small) pieces of data from these FilePattern / ConcatDim objects before passing them to functions so that they don't end up in the task graph. I don't think we want users worrying about SymbolicKeys type things if we can help it. |
This makes sense. I'm working on a PR to address this. |
@rabernat I'll track the PR work, ping me if there is any integration testing you would like done. On a somewhat related note, though @TomAugspurger's initial |
I performed some successful initial tests processing the noaa-oisst recipe using a smaller time dimension. When extending the recipe to use the full time range
pd.date_range("1981-09-01", "2021-01-05", freq="D")
I am seeing memory overruns on the container where the Prefect Flow is being executed (the container is killed for exceeding its 30GB hard limit).To obtain some more details I attempted to run the Flow in a local container directly using
flow.run
rather than the Prefect Agent's subprocess. I added sometracemalloc
instrumentation and found rapidly growing memory utilization frommsgpack
.Before the process was killed after exceeding the container's 5GB hard limit.
A traceback of the largest block prior to the container being killed points to message serialization in distributed
As a note, my scheduler and workers continue operating fine and processing tasks within their smaller memory limits until the Prefect flow runner is killed.
I've engaged with Prefect support on Slack but this appears to be a potential distributed client issue and I don't know about the depth of their dask expertise now that Jim will be leaving. I'm very inexperienced with dask internals so any suggestions or advice here would be greatly appreciated 🙇 cc @rabernat @TomAugspurger .
The text was updated successfully, but these errors were encountered: