Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recipe context #219

Merged
merged 27 commits into from
Oct 25, 2021
Merged

Recipe context #219

merged 27 commits into from
Oct 25, 2021

Conversation

rabernat
Copy link
Contributor

@rabernat rabernat commented Oct 2, 2021

hopefully...

Edited on 13 Oct - Note that now this PR does not touch the API! 🎉 As a result, all existing recipe tests pass as-is. It's a purely internal change in implementation.

How things used to be

Recipes were implemented as dataclasses, with each stage of execution as an attribute on the dataclass which returned a partial-enclosed function. To generate these partials, we had to pass tons of keyword arguments around. This created lots of room for error. And passing the FilePattern object via partials to each task in a large graph created major memory problems.

The new way

Recipes are still defined as dataclasses. However, this dataclasss doubles as a config parameter which is passed to the actual functions that do the work for the recipe. These functions have a simple signature, like this.

def cache_input(input_key: InputKey, *, config: XarrayZarrRecipe) -> None:
if config.cache_inputs:
if config.file_pattern.is_opendap:
raise ValueError("Can't cache opendap inputs")

In order to translate the recipe into something executable, recipes now must assign a function to the _compiler class variable. This is what the compiler function looks like for XarrayZarrRecipe

def xarray_zarr_recipe_compiler(recipe: XarrayZarrRecipe) -> Pipeline:
stages = [
Stage(name="cache_input", function=cache_input, mappable=list(recipe.iter_inputs())),
Stage(name="prepare_target", function=prepare_target),
Stage(name="store_chunk", function=store_chunk, mappable=list(recipe.iter_chunks())),
Stage(name="finalize_target", function=finalize_target),
]
return Pipeline(stages=stages, config=recipe)

In the process of making this work, I have also re-implemented the Pipelines execution model from rechunker from scratch, incorporating lessons learned. So this PR subsumes #192.

This PR also includes the new memory test from #220, verifying that this new implementation does not blow up the memory of dask graphs.

How to review

This is a huge refactoring, which makes it intimidating to review. Rather than just saying "everyone review everything", here are some specific review questions for specific people:

  • Are the base types in pangeo_forge_recipes/executors/base.py sensible and well designed? @alxmrs
  • Is the way I am building the dask HighLevelGraph in pangeo_forge_recipes/executors/dask.py aligned with best practices? Do you see anything that could go wrong? @TomAugspurger
  • Is the refactored pangeo_forge_recipes/recipes/reference_hdf_zarr.py more understandable, readable and maintainable? @martindurant
  • Is the refactored pangeo_forge_recipes/recipes/xarray_zarr.py more understandable, readable and maintainable? @cisaacstern

@TomAugspurger
Copy link
Contributor

I'll need to take a closer look later, but I'm not 100% sure this will solve the memory issues we're seeing on workers / the scheduler when deserializing. This is partially due to not fully understanding why the memory issues are showing up in the first place, but this feels equivalent to just passing around the context on the self object like we did earlier.

The choices are:

I think there is a third option, which mostly retains the current API, but refactors functions like pangeo_forge_recipes.recipes.xarray_zarr.cache_input that currently take file_pattern and instead pass the things they extract out of file_pattern (specifically the ConcatDim.keys in the case of IMERG).

The tricky part is that some of these things (like fname) depend on things like input_key or chunk_key, which we don't know until the function is actually called. So I think that one option forward is to update iter_inputs and iter_chunks to yield these additional pieces of metadata.

Changing the return value of iter_inputs is technically an API breaking change, but I'm less concerned with that at this stage of the project.

@rabernat
Copy link
Contributor Author

rabernat commented Oct 4, 2021

but this feels equivalent to just passing around the context on the self object like we did earlier.

I see what you're saying here. I was hoping this is different because there are no funky partials. My hope was that the (potentially large) config object would just get serialized once, rather than N times for each task. But agreed, it may not actually work.

However, even it it doesn't solve the memory issue, it may be worth pursuing for the beam compatibility.

The key question: how do we check this?

there is a third option, which mostly retains the current API, but refactors functions like pangeo_forge_recipes.recipes.xarray_zarr.cache_input that currently take file_pattern and instead pass the things they extract out of file_pattern

I think this is a good idea. I have a branch where I have already started to do this. It's mostly straightforward but there are a few tricky spots. I wrote most of that code so I sort of feel responsible for untangling it. 😬

@rabernat
Copy link
Contributor Author

rabernat commented Oct 4, 2021

@TomAugspurger - do you have 15 minutes today to chat about this? I think it would save us a lot of back and forth? I'm free most of the day.

@TomAugspurger
Copy link
Contributor

Yeah, I'm free until 12:00 Eastern (including now).

@rabernat
Copy link
Contributor Author

rabernat commented Oct 4, 2021

Cool! I'll go on https://whereby.com/pangeo and hang out there for the rest of the hour.

@TomAugspurger
Copy link
Contributor

Summary of the brief call:

We're going to test out this change. Ryan has a small change to make to the Dask task graph construction to ensure that the config is serialized exactly once, and then I'll update my testing code from #151 to see if memory usage still looks good on this branch.

@TomAugspurger
Copy link
Contributor

Actually @rabernat I have a small additional test to perform. I'm going to try serializing each of the partially applied functions using the same idea (embed them once in the task graph, rather than per task). I don't know if it'll behave similarly to serializing the object context, but I'd guess it would.

We'll likely still want to pursue the refactor here regardless of the outcome.

@rabernat
Copy link
Contributor Author

rabernat commented Oct 4, 2021

Just pushed the dask tokenization thing. If you want to see passing tests, run

pytest -vx tests/recipe_tests/test_XarrayZarrRecipe.py::test_recipe

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Oct 4, 2021

Here's the diff for that small test in #219 (comment)

diff --git a/pangeo_forge_recipes/recipes/base.py b/pangeo_forge_recipes/recipes/base.py
index 3bee9f5..a47f106 100644
--- a/pangeo_forge_recipes/recipes/base.py
+++ b/pangeo_forge_recipes/recipes/base.py
@@ -125,14 +125,16 @@ class BaseRecipe(ABC):
         import dask
         from dask.delayed import Delayed
 
-        # TODO: HighlevelGraph layers for each of these mapped inputs.
-        # Cache Input --------------------------------------------------------
         dsk = {}
         token = dask.base.tokenize(self)
 
+        # TODO: HighlevelGraph layers for each of these mapped inputs.
+        # Cache Input --------------------------------------------------------
         # TODO: allow recipes to customize which stages to run
+        cache_input_token = f"cache_input-{token}"
+        dsk[cache_input_token] = self.cache_input
         for i, input_key in enumerate(self.iter_inputs()):
-            dsk[(f"cache_input-{token}", i)] = (self.cache_input, input_key)
+            dsk[(f"cache_input-{token}", i)] = (cache_input_token, input_key)
 
         # Prepare Target ------------------------------------------------------
         dsk[f"checkpoint_0-{token}"] = (lambda *args: None, list(dsk))
@@ -143,10 +145,13 @@ class BaseRecipe(ABC):
         )
 
         # Store Chunk --------------------------------------------------------
+        store_chunk_token = f"cache_chunk-{token}"
+        dsk[store_chunk_token] = self.store_chunk
+
         keys = []
         for i, chunk_key in enumerate(self.iter_chunks()):
             k = (f"store_chunk-{token}", i)
-            dsk[k] = (_store_chunk, f"prepare_target-{token}", self.store_chunk, chunk_key)
+            dsk[k] = (_store_chunk, f"prepare_target-{token}", store_chunk_token, chunk_key)
             keys.append(k)
 
         # Finalize Target -----------------------------------------------------

This fixes the memory issues on the workers I saw in my stripped down version of the IMERG recipe. So I'm pretty confident that Ryan's change in 65a7d57 will also fix the memory issues (testing that now).


So I think we can judge this PR solely on how it affects the user API and maintenance burden. I hope that it won't have an effect on memory negatively or positively (relative to that diff I posted above)

@rabernat LMK if you want to do another quick check in to talk through next steps? I dunno if it's worth making a PR with the diff I posted fixing the old way, since it'll just create conflicts here.

@TomAugspurger
Copy link
Contributor

https://gist.github.com/TomAugspurger/792f521bf2c086fd820324d18ef349ce has my manual tests for memory usage on this branch with the updated IMERG recipe (to use this config). Things seem OK, so I think we can consider the dask memory issue fixed by including the large objects in the task graph exactly once (either the partially applied functions in #219 (comment), or the Config in this RP).

Copy link
Member

@cisaacstern cisaacstern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xarray_zarr.py is so much more readable without the kwargs passing; impressive!

Just break the API? This would mean rewriting all our tutorials, examples, intro guide, etc.

IIUC, this is the direction we're heading?

@rabernat
Copy link
Contributor Author

rabernat commented Oct 4, 2021

Things seem OK, so I think we can consider the dask memory issue fixed by including the large objects in the task graph exactly once

🎉

How would we do the equivalent check for prefect?

https://gist.github.com/TomAugspurger/792f521bf2c086fd820324d18ef349ce has my manual tests for memory usage on this branch with the updated IMERG recipe

It would be so helpful to somehow get this check into our test suite. What are the manual steps involved? Do you just watch your memory using via top or something like that? If so could, we automate this with a memory profiler?


Regarding the overall refactor, I'm inclined to move forward with this PR. Beyond the memory issue, it has the following benefits

  • Simplify recipe classes by removing tons of boilerplate
  • Beam compatibility down the line (@alxmrs, would love your feedback here eventually on whether we are on the right track)

Having thought about it a bit more, I think we may be able to do it without an API change. I'll keep working on that.

@cisaacstern
Copy link
Member

Having thought about it a bit more, I think we may be able to do it without an API change. I'll keep working on that.

This would be incredible. API change was my only reservation here.

@TomAugspurger
Copy link
Contributor

It would be so helpful to somehow get this check into our test suite. What are the manual steps involved?

I think no manual steps involved. I wait to see if Dask's memory monitor kills the worker for exceeding its budget. I'll put up a PR with a (failing) test on main.

Copy link
Contributor

@alxmrs alxmrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some general thoughts about the PR; I will respond to the main discussion in a little bit.

pangeo_forge_recipes/recipes/xarray_zarr.py Outdated Show resolved Hide resolved
delete_input_encoding=self.delete_input_encoding,
process_input=self.process_input,
) as ds:
with open_chunk(self.config, chunk_key,) as ds:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove last ,.

pangeo_forge_recipes/recipes/xarray_zarr.py Outdated Show resolved Hide resolved
@alxmrs
Copy link
Contributor

alxmrs commented Oct 4, 2021

So I think we can judge this PR solely on how it affects the user API and maintenance burden.

Yes, I think this is the main benefit of this PR; it makes a great improvement here.

Beam compatibility down the line (@alxmrs, would love your feedback here eventually on whether we are on the right track)

TL;DR: I think that either way, we'll be able to have Beam support working. I think that this change helps make this much easier.

Let me summarize the takeaways from a meeting that @TomAugspurger, @cisaacstern, and I had last week: I was primarily focused on changes in Rechunker, specifically with MultiSagePipelines -- my argument was that these should have a config/context object. Before the meeting, I thought that state was often passed between workers via closures, an assumption I made having only read unit tests in that project. In the meeting, I found out that the unit test in question was pretty much the only case where this pattern was used -- mostly, to fit with pytest fixtures. We found out that the functools.partial approach should also work with beam (it does in fact serialize well with pickle).

That said, this PR makes supporting beam much simpler. We can basically translate the to_python version of the pipeline with beam idioms. It would be something like:

      def to_beam():
            # ...
            pcoll |= beam.Create(self.iter_inputs()) | beam.Map(self.cache_input, config=self.config)
            pcoll |= beam.Create(self.iter_chunks()) | beam.Map(self.store_chunk, config=self.config)
            pcoll |= beam.ParDo(finalize_target, config=self.config)
            return pcoll

Comment on lines 10 to 26
# https://stackoverflow.com/questions/57837609/python-typing-signature-typing-callable-for-function-with-kwargs
class NoArgumentStageFunction(Protocol):
def __call__(*, config: Optional[Config] = None) -> None:
...


class SingleArgumentStageFunction(Protocol):
def __call__(__a: Hashable, *, config: Optional[Config] = None) -> None:
...


# For some reason, mypy does not like this
StageFunction = Union[NoArgumentStageFunction, SingleArgumentStageFunction]
# pangeo_forge_recipes/recipes/xarray_zarr.py:525: error:
# Argument "function" to "Stage" has incompatible type
# "Callable[[Index, NamedArg(XarrayZarrRecipe, 'config')], None]";
# expected "NoArgumentStageFunction" [arg-type]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really appreciate some type-hinting advice here. I am trying to define a specific type for these stage functions using a protocol. But mypy is not recognizing my functions as valid implementations of the protocol, instead identifying them as Callable[[Index, NamedArg(XarrayZarrRecipe, 'config')], None].

This looks like it could be a mypy bug: python/mypy#8317

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, instead of Hashable, you want to use a bounded TypeVar? e.g.

H = TypeVar('H', bound=Hashable)
# ...
class SingleArgumentStageFunction(Protocol):
    def __call__(__a: H, *, config: Optional[Config] = None) -> None:
        ...

See these Generics docs in MyPy: https://mypy.readthedocs.io/en/latest/generics.html#type-variables-with-upper-bounds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about it more, and I think that Hashable may be fine. It could be the Config=Any that's causing the type check issue. TypeVars (bounded, value restricted, or standard) could be helpful here, too.

@rabernat
Copy link
Contributor Author

Hi Folks! I have just pushed a big update that (I hope) fully resolves all remaining issues. I have updated my top-level comment in #219 (comment) and am looking for reviews again.

Also note the mypy question just above.

@rabernat rabernat marked this pull request as ready for review October 13, 2021 15:28
@TomAugspurger
Copy link
Contributor

Is the way I am building the dask HighLevelGraph in pangeo_forge_recipes/executors/dask.py aligned with best practices? Do you see anything that could go wrong? @TomAugspurger

Looks good to me at a quick glance. Calling recipe.to_dask().visualize() on the terraclimate recipe makes shows that things look reasonable.

I don't have any immediate guesses on the typing issue.

@martindurant
Copy link
Contributor

Is the refactored pangeo_forge_recipes/recipes/reference_hdf_zarr.py more understandable, readable and maintainable?

The answer is an unqualified "yes"

@@ -21,8 +23,83 @@ def no_op(*_, **__) -> None:
return None


def scan_file(chunk_key: ChunkKey, config: HDFReferenceRecipe):
assert config.metadata_cache is not None, "metadata_cache is required"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been told that assert should not be used outside of tests. Since we should verify values before starting a recipe, it seems superfluous to check again in these work functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are required to make mypy pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When mypy requires you to write extra lines of code, I stop using mypy... :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be possible to configure execution to not include any assert statements via an -O flag. Maybe, after investigating this option, we can document a pathway for more performant runs?

https://stackoverflow.com/a/43668496

@@ -58,6 +135,8 @@ class HDFReferenceRecipe(BaseRecipe, FilePatternRecipeMixin):
Only used if `file_pattern` has more than one file.
"""

_compiler = hdf_reference_recipe_compiler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised that this is a function, feels like you could more simply include the pipeline stages right here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would result in circular calls, since generating the pipeline requires iterating over items in the Recipe itself. The function takes the Recipe as an input and returns a pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was not suggesting a change here, only a vague misgiving that it might be confusing to others. Perhaps a comment simply saying that this is a function might help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do much better than comments! We have detailed type hints.

@dataclass
class BaseRecipe(ABC):
_compiler: ClassVar[RecipeCompiler]

RecipeCompiler = Callable[[BaseRecipe], Pipeline]

🥳

@alxmrs
Copy link
Contributor

alxmrs commented Oct 13, 2021

I left a couple of guests to address the typing issue. I'm working on a tightish deadline right now, but if I find some time, I'll pull this branch locally and see if I can find a solution.

Copy link
Member

@cisaacstern cisaacstern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that now this PR does not touch the API! 🎉

Awesome!

Is the refactored pangeo_forge_recipes/recipes/xarray_zarr.py more understandable, readable and maintainable?

Yes, this PR represents a considerable improvement in all three categories. Re: understandability, it took me a moment to grok how the stages are actually added (via _compiler + BaseRecipe methods), but once I did, I really appreciated the clarity of it, in particular the Pipeline + Stage objects and their relationship.

This PR makes me wonder if we'll want to rewrite Manual Execution docs to be more generalized. I played around a bit with what that might look like in this Gist (which shares a lot with FunctionPipelineExecutor, of course).

The limitation of the approach in that Gist is that it accesses the inner _compiler attribute. Maybe this means we'll want to have a convenience function somewhere to surface the stages for manual execution? (Which I do believe is good to support for learning and debugging purposes.) Perhaps this would be via a ManualPipelineExecutor class and corresponding BaseRecipe.to_manual_stages method?

My curiosity here is wondering how someone wanting to manually debug an arbitrary recipe class can easily access the individual stages of that recipe.

@rabernat
Copy link
Contributor Author

Maybe this means we'll want to have a convenience function somewhere to surface the stages for manual execution? (Which I do believe is good to support for learning and debugging purposes.)

This is a very important point Charles. I'm going to think about the best way to bring back that convenient introspection.

@rabernat
Copy link
Contributor Author

I'm going to think about the best way to bring back that convenient introspection.

I ended up reimplementing the methods (e.g. recipe.prepare_target()) along with a deprecation warning. Going forward, I think we will want to just expose the functions directly from the recipe module (prepare_target(config=recipe)) and have the documentation for each class explain how to manually run the recipe.

I also solved the mypy issues by using the (deprecated NamedVar type)

SingleArgumentStageFunction = Callable[[Any, NamedArg(type=Any, name="config")], None]

Merging this now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants