Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask distributed client memory overruns on Prefect with large time dimensions. #151

Closed
sharkinsspatial opened this issue May 30, 2021 · 40 comments · Fixed by #219
Closed

Comments

@sharkinsspatial
Copy link
Contributor

I performed some successful initial tests processing the noaa-oisst recipe using a smaller time dimension. When extending the recipe to use the full time range pd.date_range("1981-09-01", "2021-01-05", freq="D") I am seeing memory overruns on the container where the Prefect Flow is being executed (the container is killed for exceeding its 30GB hard limit).

To obtain some more details I attempted to run the Flow in a local container directly using flow.run rather than the Prefect Agent's subprocess. I added some tracemalloc instrumentation and found rapidly growing memory utilization from msgpack.

INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=66.9 MiB, count=6, average=11.2 MiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=1156 MiB, count=120, average=9868 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=2408 MiB, count=251, average=9826 KiB
INFO:root:/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py:0: size=3603 MiB, count=377, average=9787 KiB

Before the process was killed after exceeding the container's 5GB hard limit.

A traceback of the largest block prior to the container being killed points to message serialization in distributed

INFO:root:34 memory blocks: 1448426.9 KiB
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 890
INFO:root:    self._bootstrap_inner()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 932
INFO:root:    self.run()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/threading.py", line 870
INFO:root:    self._target(*self._args, **self._kwargs)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 80
INFO:root:    work_item.run()
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/concurrent/futures/thread.py", line 57
INFO:root:    result = self.fn(*self.args, **self.kwargs)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py", line 1471
INFO:root:    return await loop.run_in_executor(_offload_executor, lambda: fn(*args, **kwargs))
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/utils.py", line 31
INFO:root:    protocol.dumps(
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 70
INFO:root:    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
INFO:root:  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/msgpack/__init__.py", line 35
INFO:root:    return Packer(**kwargs).pack(o)
Killed

As a note, my scheduler and workers continue operating fine and processing tasks within their smaller memory limits until the Prefect flow runner is killed.

I've engaged with Prefect support on Slack but this appears to be a potential distributed client issue and I don't know about the depth of their dask expertise now that Jim will be leaving. I'm very inexperienced with dask internals so any suggestions or advice here would be greatly appreciated 🙇 cc @rabernat @TomAugspurger .

@sharkinsspatial sharkinsspatial changed the title Dask distributed client memory overruns when with large time dimensions. Dask distributed client memory overruns on Prefect with large time dimensions. May 30, 2021
@TomAugspurger
Copy link
Contributor

Do you know roughly where in the pipeline things are failing? I assume in store_chunk?

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger It is difficult to isolate as it appears that there is no failure of the recipe code in the workers but that the memory increase and failure is occurring in the container where the Prefect flow execution is running (where tasks are being submitted to the cluster by the Prefect DaskExecutor).

The workers continue processing recipe tasks until the flow execution container is killed. In the example above where I am using a small 5GB limit for the flow execution container, the workers are only able to check a single cached input before the flow execution container is killed and stops submitting jobs to the cluster.

@TomAugspurger
Copy link
Contributor

Ahh, I missed that it's the running the flow that's being killed (rather than a worker). Sorry.

There's been some issues in the past around logs filling out memory on the scheduler. Do you know what level logging is set at, and whether anything is being stored in an in-memory list?

@rabernat
Copy link
Contributor

rabernat commented Jun 1, 2021

I am seeing memory overruns on the container where the Prefect Flow is being executed (the container is killed for exceeding its 30GB hard limit).

This sounds exactly like #116.

Edit: not exactly. In #116, we are are using the Dask executor, and here we are using prefect.

@sharkinsspatial
Copy link
Contributor Author

@rabernat Thanks for seeing the connection here, I hadn't reviewed #116 as I was considering this as a purely Prefect issue. My (albeit incomplete) understanding of Prefect's DaskExecutor is that no task graph is constructed for the entire flow but that the individual tasks are submitted to the dask scheduler and graph dependencies are managed purely by Prefect. So in the Prefect case, improving FilePattern serialization may fix our issue. My next steps for testing are to run a test using #117 and also migrate our Prefect Flow storage to use the script based model pangeo-forge/pangeo-forge-prefect#9 which might also alleviate serialization issues.

@sharkinsspatial
Copy link
Contributor Author

I updated the Prefect environment to use 0.3.4. Rather than seeing OOM errors on the container where the dask client is running and submitting tasks to the scheduler, I'm now seeing OOM errors directly on the scheduler.

@rabernat
Copy link
Contributor

rabernat commented Jun 1, 2021

Thanks for your persistence @sharkinsspatial! Such a frustrating situation. Do the logs give you any information about where in the code we are at when we run out of memory?

@sharkinsspatial
Copy link
Contributor Author

I'm including some more detailed memory instrumentation for the scheduler and workers that will hopefully shed some light and trying to dig a bit through the Prefect codebase to understand the type of dask graph which might be constructed for Flow execution. I'll hopefully have some more details tomorrow but my gut tells me this is most likely closely related to #116.

@TomAugspurger
Copy link
Contributor

@sharkinsspatial let me know if you want to do a video call tomorrow to walk through it. I have some free time before 10:00 central and maybe from 3:00 - 4:00 Central.

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Jun 3, 2021

@TomAugspurger Based on our discussion yesterday I ran simplified flow to try and isolate potential points for memory leaks at the worker level and to try to determine if recipe serialization or something within the recipe class may be responsible for increasing memory usage.

def source_url(day: str) -> str:
    day = pd.Timestamp(day)
    source_url_pattern = (
        "https://www.ncei.noaa.gov/data/"
        "sea-surface-temperature-optimum-interpolation/v2.1/access/avhrr/"
        "{day:%Y%m}/oisst-avhrr-v02r01.{day:%Y%m%d}.nc"
    )
    return source_url_pattern.format(day=day)


@task
def download(source_url, cache_location):
    #  tracemalloc.start(25)
    #  snapshot = tracemalloc.take_snapshot()
    target_url = os.path.join(cache_location, str(hash(source_url)))

    try:
        fsspec.open(target_url).open()
        return target_url
    except FileNotFoundError:
        pass

    with fsspec.open(source_url, mode="rb") as source:
        with fsspec.open(target_url, mode="wb") as target:
            target.write(source.read())

    #  top_stats = snapshot.statistics('traceback')
    return target_url


with Flow(
    name,
    storage=storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9"),
    run_config=run_config,
    executor=dask_executor,
) as flow:
    days = Parameter(
        "days",
        default=pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").tolist(),
    )
    sources = list(map(source_url, pd.date_range("1981-09-01", "2021-01-05", freq="D").strftime("%Y-%m-%d").to_list()))
    nc_sources = download.map(
        sources,
        cache_location=unmapped(f"s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor196cpck7y0pbl/{name}/cache"),
    )

Though the rate of memory increase at the worker level is lower, there is still a steady, linear memory increase on workers. Looking at the tops stats from tracemalloc distributed is increasing memory allocation with each task that is run

/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py:105: size=2149 MiB (+557 MiB), count=7616 (+1790), average=289 KiB
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py:105: size=2659 MiB (+443 MiB), count=9261 (+1425), average=294 KiB

Looking at the traceback for the highest stat reports

8882 memory blocks: 2579912.4 KiB
--
File "<string>", line 1
File "/srv/conda/envs/notebook/lib/python3.8/multiprocessing/spawn.py", line 116
exitcode = _main(fd, parent_sentinel)
File "/srv/conda/envs/notebook/lib/python3.8/multiprocessing/spawn.py", line 129
return self._bootstrap(parent_sentinel)
File "/srv/conda/envs/notebook/lib/python3.8/multiprocessing/process.py", line 315
self.run()
File "/srv/conda/envs/notebook/lib/python3.8/multiprocessing/process.py", line 108
self._target(*self._args, **self._kwargs)
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/process.py", line 192
target(*args, **kwargs)
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 792
loop.run_sync(run)
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/ioloop.py", line 524
self.start()
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 199
self.asyncio_loop.run_forever()
File "/srv/conda/envs/notebook/lib/python3.8/asyncio/base_events.py", line 570
self._run_once()
File "/srv/conda/envs/notebook/lib/python3.8/asyncio/base_events.py", line 1859
handle._run()
File "/srv/conda/envs/notebook/lib/python3.8/asyncio/events.py", line 81
self._context.run(self._callback, *self._args)
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 999
await self.handle_stream(
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 555
msgs = await comm.read()
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 218
msg = await from_frames(
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/utils.py", line 79
res = _from_frames()
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/utils.py", line 62
return protocol.loads(
File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/core.py", line 105
return msgpack.loads(

In reference to dask/distributed#4091 I have decreased profiler frequency using these environment settings on the scheduler and worker containers

"DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL": "10000ms",
"DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE": "1000000ms",

But this does not seem to alleviate the issue.

@TomAugspurger
Copy link
Contributor

Interesting, thanks... Do you think it's worth removing fsspec and s3fs from the equation now, and just using requests and boto3 directly?

@rabernat
Copy link
Contributor

rabernat commented Jun 3, 2021

Could the memory growth be related to fsspec / s3fs caching? If so, that can be turned off.

@TomAugspurger
Copy link
Contributor

We discussed that briefly yesterday. Looking at the docs, we felt like it should have an upper bound and start purge items from the cache by default, but didn't test out disabling it yet.

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Jun 3, 2021

@rabernat I had previously tested using

fs = S3FileSystem(
        anon=False,
        default_cache_type="none",
        default_fill_cache=False,
    )

In order to see if s3fs caching could be playing apart that results were the same. In order to drastically simplify things and potentially isolate this to a Dask or Prefect issue I made another run using

@task
def download(source_url, cache_location):
    time.sleep(0.4)

Which showed the same steadily increasing memory usage. To my naive eye, this coupled with the tracemalloc output showing steadily increasing memory use of distributed indicates that this is some type of issue with the how Prefect dask client is submitting tasks or a lower core issue in distributed. I am quite far out of my depth trying to diagnose any core issue here. I've referenced this issue the Prefect Slack for communications with their developer team. On the dask side of things, @TomAugspurger what would be the next steps for us to add further details and escalate dask/distributed#4091? There are also several loosely related issues but this one seems to be the most relevant to this case.

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger A discussion of a similar issue from the [Prefect Slack channel] (https://prefect-community.slack.com/archives/CL09KU1K7/p1623226992151500) references this older issue you raised on distributed.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Jun 24, 2021

Picking up the thread on worker memory issues, https://gist.github.com/TomAugspurger/f870d87456695cf47bafd5a8a5ae450e as an incomplete start at a worker plugin to periodically snapshot the types & sizes of all the objects in the worker process's memory. That might shed some light on which objects are sticking around.

I don't know if I'll have a chance to get back to that today or tomorrow.

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Sep 30, 2021

As noted in #208 there are Prefect specific task serialization issues that are being tracked by Prefect with ongoing work here. Given this issue I have been running recipes directly without Prefect. Testing with a reduced time dimension version of the gpm-imerg recipe I have also encountered potential serialization related issues. I realize I haven't included easily reproducible examples of problematic recipe serialization for large time dimensions. Using the following recipe code

import aiohttp
import pandas as pd

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes import XarrayZarrRecipe

# TODO: replace with ENV vars
username = password = "[email protected]"


def make_filename(time):
    input_url_pattern = (
        "https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/"
        "imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5"
    ).format(
        yyyy=time.strftime("%Y"),
        mm=time.strftime("%m"),
        dd=time.strftime("%d"),
        yyyymmdd=time.strftime("%Y%m%d"),
        sh=time.strftime("%H"),
        sm=time.strftime("%M"),
        eh=time.strftime("%H"),
        em=(time + pd.Timedelta("29 min")).strftime("%M"),
        MMMM=f"{(time.hour*60 + time.minute):04}",
    )
    return input_url_pattern


dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")
time_concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(
    make_filename,
    time_concat_dim,
    fsspec_open_kwargs={"auth": aiohttp.BasicAuth(username, password)},
)

recipe = XarrayZarrRecipe(
    pattern,
    xarray_open_kwargs={"group": "Grid", "decode_coords": "all"},
    inputs_per_chunk=1,
)

With a LocalCluster

from distributed import Client, LocalCluster
from distributed.worker import Worker
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget
import dask
from pympler import asizeof
import tempfile
from fsspec.implementations.local import LocalFileSystem
from imerg_recipe import recipe

fs_local = LocalFileSystem()
cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)
target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)
metadata_dir = tempfile.TemporaryDirectory()
metadata_target = MetadataTarget(fs_local, metadata_dir.name)

recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = metadata_target
delayed = recipe.to_dask()
print(dask.utils.format_bytes(asizeof.asizeof(delayed)))


import tracemalloc
tracemalloc.start(15)


def memory_logger():
    import time
    while True:
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics("filename")
        if len(top_stats) > 0:
            stat = top_stats[0]
            print("%s memory blocks: %.1f MB" % (stat.count, stat.size / (1024 * 1024)))
            for line in stat.traceback.format():
                print(line)
        time.sleep(5)

import threading
thread = threading.Thread(target=memory_logger)
thread.daemon = True
thread.start()

with LocalCluster(
    n_workers=2,
    worker_class=Worker,
    memory_limit='5GB',
) as cluster, Client(cluster) as client:
    delayed.compute()
    cluster.close()

Our 5GB worker limit is quickly overrun prior to any task being executed. I don't have a strong understanding of task serialization in Dask but I'm curious how this recipe which is reported 21.36MB causes such a large memory footprint on both the client and workers.

@cisaacstern
Copy link
Member

#151 (comment) ☝️ cc @alxmrs

@sharkinsspatial
Copy link
Contributor Author

Interestingly, in our CI based workflows we don't have access to the recipe module location at runtime and instead interrogate the meta.yaml to dynamically import the the recipe module.

import os
import importlib


name = "recipe"
module_path = os.path.abspath("./imerg_recipe.py")
spec = importlib.util.spec_from_file_location(name, module_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
recipe = getattr(module, name)

This appears to cause Dask to use a different serialization strategy. In this case we see a slow memory expansion (led by cloudpickle_fast.py) on the client thread and data is transferred to the workers at a much slower rate but still resulting in the same eventual worker memory overrun. I don't have a clear picture of the interplay between module import method and pickling and why this approach results in different behavior.

@TomAugspurger
Copy link
Contributor

@sharkinsspatial thanks for posting that. Just to clarify, when you run the job do you see the memory spike on the scheduler, workers, or both?

And do you have a sense for what the peak memory usage should be? Something like the size of the largest chunk times the number of chunks being merged?

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger In the first case (with a normal module import) there is a rapid spike in memory to ≈5GB on the client followed by almost immediate memory growth on the workers until they begin reporting

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 4.19 GiB -- Worker memory limit: 4.66 GiB

Scheduler memory increases rapidly inline with worker memory growth. As for peak memory I would expect that your assumption for peak memory is correct, however this memory growth is almost immediate and is occurring prior to any tasks being executed (cache_inputs is the first stage and none of the tasks have even registered in the dashboard before the workers are OOM).

In the second case when obtaining the recipe module via

spec.loader.exec_module(module)

Memory growth slowly increases on both the client, workers and scheduler until the workers report Unmanaged memory use is high as if the serialization and Client->Scheduler data transfer process is much slower when using this module reference strategy. So, same end result, just occurring much more slowly.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Oct 1, 2021

I've narrowed the serialization issue down to just FilePattern. I'll post more details later (have to run again), but the gist is if we replace every element of the Dask graph with "simple" objects, then things are fine.

The code below completes without any issues (doesn't do anything other than submit a bunch of no-op tasks to dask)

import dataclasses
import functools
import tempfile
import dask.sizeof
import dask.utils
from distributed import Client
from fsspec.implementations.local import LocalFileSystem

import pangeo_forge_recipes.patterns
from imerg_recipe import recipe
from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget, MetadataTarget


# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.FilePattern)
# def sizeof_file_pattern(o: pangeo_forge_recipes.patterns.FilePattern) -> int:
#     return (
#         dask.sizeof.sizeof(o.format_func) +
#         dask.sizeof.sizeof(o.fsspec_open_kwargs) +
#         dask.sizeof.sizeof(o.query_string_secrets) +
#         dask.sizeof.sizeof(o.is_opendap)
#     )


# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.MergeDim)
# @dask.sizeof.sizeof.register(pangeo_forge_recipes.patterns.ConcatDim)
# def sizeof_combinedim(o) -> int:
#     return dask.sizeof.sizeof(dataclasses.asdict(o))


fs_local = LocalFileSystem()
cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)
target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)
metadata_dir = tempfile.TemporaryDirectory()
metadata_target = MetadataTarget(fs_local, metadata_dir.name)

recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = metadata_target
obj = recipe.to_dask()
dsk = obj.dask


def simple_func(*args, **kwargs):
    return None


def simplify_callable(dsk):
    for k, v in dsk.items():
        _, *args = v
        dsk[k] = (simple_func,) + tuple(args)


def simplify_args(dsk):
    simple_types = (int, str, list, pangeo_forge_recipes.patterns.Index)
    for k, v in dsk.items():
        func, *args = v
        new_args = []
        n = len(args)
        for arg in args:
            if isinstance(arg, simple_types):
                new_args.append(arg)
            elif isinstance(arg, functools.partial):
                replace = {"file_pattern"}
                # replace = {}
                new_kwargs = dict()
                for i, (kwarg, val) in enumerate(arg.keywords.items()):
                    if kwarg in replace:
                        new_kwargs[kwarg] = i
                    else:
                        new_kwargs[kwarg] = val

                new_arg = functools.partial(arg.func, **new_kwargs)
                new_args.append(new_arg)
            else:
                raise ValueError

        assert len(new_args) == n
        dsk[k] = (func,) + tuple(new_args)


if __name__ == "__main__":
    print("simplifying")
    simplify_callable(dsk)
    simplify_args(dsk)

    with Client(n_workers=1, threads_per_worker=1, memory_limit="4G", nanny=False) as client:
        print("submitting")
        client.get(dsk, [obj.key])

If we don't replace the file_pattern objects in the values of the partially applied functions in the arguments in the Dask graph (what a sentence), then I see the memory issues (on the worker and scheduler). You can do that by changing replace = {"file_pattern"} to replace = {} and running the program again.

I'll pick this up again this afternoon.

@TomAugspurger
Copy link
Contributor

#217 might have a fix. @sharkinsspatial are you able to test out the full recipe on a branch?

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger This appears to solve our initial memory overrun (though I can't fully test as the NASA server bounces me with aiohttp.client_exceptions.ServerDisconnectedError almost immediately 🤣 ). Let me try to test on a k8s cluster as well. My one concern there is that (because of what I assume are network communication limitations between my client and the scheduler in the cluster) I hit OOM errors on the client pod as the graph is buffered during the initial call to compute. Out of curiosity, do we have a sense of why the serialized FilePattern are so large?

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Oct 1, 2021

@sharkinsspatial as I noted in #217 (comment), my "fix" might be off. I'm not sure anymore.

I do have something that I think is a fix on the recipe though. Can you try with this for the imerg_recipe.py? No changes to pangeo-forge-recipes.

import aiohttp
import pandas as pd

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.recipes import XarrayZarrRecipe

# TODO: replace with ENV vars
username = password = "[email protected]"


def make_filename(time):
    input_url_pattern = (
        "https://arthurhouhttps.pps.eosdis.nasa.gov/gpmdata/{yyyy}/{mm}/{dd}/"
        "imerg/3B-HHR.MS.MRG.3IMERG.{yyyymmdd}-S{sh}{sm}00-E{eh}{em}59.{MMMM}.V06B.HDF5"
    ).format(
        yyyy=time.strftime("%Y"),
        mm=time.strftime("%m"),
        dd=time.strftime("%d"),
        yyyymmdd=time.strftime("%Y%m%d"),
        sh=time.strftime("%H"),
        sm=time.strftime("%M"),
        eh=time.strftime("%H"),
        em=(time + pd.Timedelta("29 min")).strftime("%M"),
        MMMM=f"{(time.hour*60 + time.minute):04}",
    )
    return input_url_pattern


dates = pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")

def func(i):
    return pd.date_range("2020-05-31T00:00:00", "2021-05-31T23:59:59", freq="30min")[i]


class SymbolicKeys:
    def __init__(self, func, n):
        self.func = func
        self.n = n

    def __iter__(self):
        for i in range(self.n):
            yield self.func(i)

    def __len__(self):
        return self.n



keys = SymbolicKeys(func, len(dates))


time_concat_dim = ConcatDim("time", keys, nitems_per_file=1)
pattern = FilePattern(
    make_filename,
    time_concat_dim,
    fsspec_open_kwargs={"auth": aiohttp.BasicAuth(username, password)},
)

recipe = XarrayZarrRecipe(
    pattern,
    xarray_open_kwargs={"group": "Grid", "decode_coords": "all"},
    inputs_per_chunk=1,
)

The difference is that SymbolicKeys bit, which avoids ever putting the large DatetimeIndex of keys in the task graph.

Happy to jump on https://whereby.com/pangeo to chat through this sometime in the next 30 minutes.

@cisaacstern
Copy link
Member

The difference is that SymbolicKeys bit, which avoids ever putting the large DatetimeIndex of keys in the task graph.

Cool idea. If this solution sticks, we could consider having ConcatDim.__post_init__ make this transformation for users.

@rabernat
Copy link
Contributor

rabernat commented Oct 1, 2021

I think a more general solution to this might be to just never ship the FilePattern to the tasks at all. There is no fundamental reason why all tasks need a copy of the index.

@TomAugspurger
Copy link
Contributor

Yeah, I think that's the right solution if it's doable. Looking now.

@rabernat
Copy link
Contributor

rabernat commented Oct 1, 2021

It's not a trivial refactor but should be possible.

@rabernat
Copy link
Contributor

rabernat commented Oct 1, 2021

I can try to make that PR.

@TomAugspurger
Copy link
Contributor

That'd be great. I'm getting a bit tripped up how to get fname = file_pattern[input_key] early enough that it doesn't get into the task graph. I started at https://github.com/TomAugspurger/pangeo-forge/pull/new/fix/151-memory-filepattern (currently broken).

And I'm like 90% sure that this SymbolicKeys stuff will fix the real recipe, but maybe we want to wait to hear from @sharkinsspatial before doing another large refactor.

@rabernat
Copy link
Contributor

rabernat commented Oct 1, 2021

I do think the SymbolicKeys will fix it. But I think that the SymbolicKeys approach adds complexity, while mine removes it. Let's try both. I'm going to add this to #213, which is already about how we open files.

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger This SymbolicKeys approach is failing with

Function:  cache_input
args:      ((DimIndex(name='time', index=5958, sequence_len=17568, operation=<CombineOp.CONCAT: 2>),))
kwargs:    {}
Exception: TypeError("'SymbolicKeys' object is not subscriptable")

For each of the time indice tasks.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Oct 1, 2021

@sharkinsspatial can you try with

class SymbolicKeys:
    def __init__(self, func, n):
        self.func = func
        self.n = n

    def __iter__(self):
        for i in range(self.n):
            yield self.func(i)

    def __len__(self):
        return self.n

    def __getitem__(self, i):
        return self.func(i)

We need to figure out exactly what features of that ConcatDim.keys we were using in the actual functions. I'll take a look, but you can try this in the meantime if it's easy.

According to the type signature, it just needs to be a collection (so __getitem__ and __len__) so maybe that implementation will work.

@sharkinsspatial
Copy link
Contributor Author

sharkinsspatial commented Oct 1, 2021

@TomAugspurger I had actually just tried that and hit

distributed.worker - WARNING - Compute Failed
Function:  cache_input
args:      ((DimIndex(name='time', index=8785, sequence_len=17568, operation=<CombineOp.CONCAT: 2>),))
kwargs:    {}
Exception: AttributeError("'Timestamp' object has no attribute 'strftie'")

@TomAugspurger
Copy link
Contributor

Is the error message actually strftie, because if so that's true. It should be strftime, so there's a typo somewhere.

@sharkinsspatial
Copy link
Contributor Author

@TomAugspurger 👍 Yup. Our em pattern above was incorrect em=(time + pd.Timedelta("29 min")).strftime("%M"). This runs cleanly with flat memory utilization on the Client, Scheduler and Workers. I'll make a more full scale run on the k8s cluster to verify.

@TomAugspurger
Copy link
Contributor

So in terms of "root"-cause it seems like passing around ConcatDim objects, specifically large ConcatDim.keys, is a bad idea. I'm not 100% sure why, but my guess is that somewhere when deserializing tasks, dask ends up creating n_inputs copies of the DatetimeIndex. In this recipe that's something like:

In [20]: dask.utils.format_bytes(len(dates) * dates.nbytes)
Out[20]: '2.30 GiB'

I think that on the client, we have ~17,000 references to the same dates DatetimeIndex, while on the scheduler / worker we have ~17,000 individual DatetimeIndexes. That's my guess anyway.


Ideally we're able to extract the relevant (small) pieces of data from these FilePattern / ConcatDim objects before passing them to functions so that they don't end up in the task graph. I don't think we want users worrying about SymbolicKeys type things if we can help it.

@rabernat
Copy link
Contributor

rabernat commented Oct 1, 2021

it seems like passing around ConcatDim objects, specifically large ConcatDim.keys, is a bad ide

This makes sense. I'm working on a PR to address this.

@sharkinsspatial
Copy link
Contributor Author

@rabernat I'll track the PR work, ping me if there is any integration testing you would like done. On a somewhat related note, though @TomAugspurger's initial sizeof registration #217 is not the solution here, I do think this would be valuable to the Prefect team in their refactoring work as this situation of task size underestimation causing the scheduler to oversubscribe a worker appears to be similar to their case (though I believe they also have significant work to do in reducing the deserialized size of individual tasks) refs PrefectHQ/prefect#5004

@rabernat rabernat mentioned this issue Oct 2, 2021
TomAugspurger pushed a commit to TomAugspurger/pangeo-forge that referenced this issue Oct 4, 2021
rabernat added a commit that referenced this issue Oct 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants