-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recipe context #219
Recipe context #219
Conversation
I'll need to take a closer look later, but I'm not 100% sure this will solve the memory issues we're seeing on workers / the scheduler when deserializing. This is partially due to not fully understanding why the memory issues are showing up in the first place, but this feels equivalent to just passing around the context on the
I think there is a third option, which mostly retains the current API, but refactors functions like The tricky part is that some of these things (like Changing the return value of |
I see what you're saying here. I was hoping this is different because there are no funky partials. My hope was that the (potentially large) config object would just get serialized once, rather than N times for each task. But agreed, it may not actually work. However, even it it doesn't solve the memory issue, it may be worth pursuing for the beam compatibility. The key question: how do we check this?
I think this is a good idea. I have a branch where I have already started to do this. It's mostly straightforward but there are a few tricky spots. I wrote most of that code so I sort of feel responsible for untangling it. 😬 |
@TomAugspurger - do you have 15 minutes today to chat about this? I think it would save us a lot of back and forth? I'm free most of the day. |
Yeah, I'm free until 12:00 Eastern (including now). |
Cool! I'll go on https://whereby.com/pangeo and hang out there for the rest of the hour. |
Summary of the brief call: We're going to test out this change. Ryan has a small change to make to the Dask task graph construction to ensure that the config is serialized exactly once, and then I'll update my testing code from #151 to see if memory usage still looks good on this branch. |
Actually @rabernat I have a small additional test to perform. I'm going to try serializing each of the partially applied functions using the same idea (embed them once in the task graph, rather than per task). I don't know if it'll behave similarly to serializing the object context, but I'd guess it would. We'll likely still want to pursue the refactor here regardless of the outcome. |
Just pushed the dask tokenization thing. If you want to see passing tests, run
|
Here's the diff for that small test in #219 (comment) diff --git a/pangeo_forge_recipes/recipes/base.py b/pangeo_forge_recipes/recipes/base.py
index 3bee9f5..a47f106 100644
--- a/pangeo_forge_recipes/recipes/base.py
+++ b/pangeo_forge_recipes/recipes/base.py
@@ -125,14 +125,16 @@ class BaseRecipe(ABC):
import dask
from dask.delayed import Delayed
- # TODO: HighlevelGraph layers for each of these mapped inputs.
- # Cache Input --------------------------------------------------------
dsk = {}
token = dask.base.tokenize(self)
+ # TODO: HighlevelGraph layers for each of these mapped inputs.
+ # Cache Input --------------------------------------------------------
# TODO: allow recipes to customize which stages to run
+ cache_input_token = f"cache_input-{token}"
+ dsk[cache_input_token] = self.cache_input
for i, input_key in enumerate(self.iter_inputs()):
- dsk[(f"cache_input-{token}", i)] = (self.cache_input, input_key)
+ dsk[(f"cache_input-{token}", i)] = (cache_input_token, input_key)
# Prepare Target ------------------------------------------------------
dsk[f"checkpoint_0-{token}"] = (lambda *args: None, list(dsk))
@@ -143,10 +145,13 @@ class BaseRecipe(ABC):
)
# Store Chunk --------------------------------------------------------
+ store_chunk_token = f"cache_chunk-{token}"
+ dsk[store_chunk_token] = self.store_chunk
+
keys = []
for i, chunk_key in enumerate(self.iter_chunks()):
k = (f"store_chunk-{token}", i)
- dsk[k] = (_store_chunk, f"prepare_target-{token}", self.store_chunk, chunk_key)
+ dsk[k] = (_store_chunk, f"prepare_target-{token}", store_chunk_token, chunk_key)
keys.append(k)
# Finalize Target ----------------------------------------------------- This fixes the memory issues on the workers I saw in my stripped down version of the IMERG recipe. So I'm pretty confident that Ryan's change in 65a7d57 will also fix the memory issues (testing that now). So I think we can judge this PR solely on how it affects the user API and maintenance burden. I hope that it won't have an effect on memory negatively or positively (relative to that diff I posted above) @rabernat LMK if you want to do another quick check in to talk through next steps? I dunno if it's worth making a PR with the diff I posted fixing the old way, since it'll just create conflicts here. |
https://gist.github.com/TomAugspurger/792f521bf2c086fd820324d18ef349ce has my manual tests for memory usage on this branch with the updated IMERG recipe (to use this config). Things seem OK, so I think we can consider the dask memory issue fixed by including the large objects in the task graph exactly once (either the partially applied functions in #219 (comment), or the Config in this RP). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xarray_zarr.py
is so much more readable without the kwargs passing; impressive!
Just break the API? This would mean rewriting all our tutorials, examples, intro guide, etc.
IIUC, this is the direction we're heading?
🎉 How would we do the equivalent check for prefect?
It would be so helpful to somehow get this check into our test suite. What are the manual steps involved? Do you just watch your memory using via Regarding the overall refactor, I'm inclined to move forward with this PR. Beyond the memory issue, it has the following benefits
Having thought about it a bit more, I think we may be able to do it without an API change. I'll keep working on that. |
This would be incredible. API change was my only reservation here. |
I think no manual steps involved. I wait to see if Dask's memory monitor kills the worker for exceeding its budget. I'll put up a PR with a (failing) test on main. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some general thoughts about the PR; I will respond to the main discussion in a little bit.
delete_input_encoding=self.delete_input_encoding, | ||
process_input=self.process_input, | ||
) as ds: | ||
with open_chunk(self.config, chunk_key,) as ds: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove last ,
.
Yes, I think this is the main benefit of this PR; it makes a great improvement here.
TL;DR: I think that either way, we'll be able to have Beam support working. I think that this change helps make this much easier. Let me summarize the takeaways from a meeting that @TomAugspurger, @cisaacstern, and I had last week: I was primarily focused on changes in Rechunker, specifically with MultiSagePipelines -- my argument was that these should have a config/context object. Before the meeting, I thought that state was often passed between workers via closures, an assumption I made having only read unit tests in that project. In the meeting, I found out that the unit test in question was pretty much the only case where this pattern was used -- mostly, to fit with pytest fixtures. We found out that the That said, this PR makes supporting beam much simpler. We can basically translate the
|
6c46d82
to
637bc4a
Compare
# https://stackoverflow.com/questions/57837609/python-typing-signature-typing-callable-for-function-with-kwargs | ||
class NoArgumentStageFunction(Protocol): | ||
def __call__(*, config: Optional[Config] = None) -> None: | ||
... | ||
|
||
|
||
class SingleArgumentStageFunction(Protocol): | ||
def __call__(__a: Hashable, *, config: Optional[Config] = None) -> None: | ||
... | ||
|
||
|
||
# For some reason, mypy does not like this | ||
StageFunction = Union[NoArgumentStageFunction, SingleArgumentStageFunction] | ||
# pangeo_forge_recipes/recipes/xarray_zarr.py:525: error: | ||
# Argument "function" to "Stage" has incompatible type | ||
# "Callable[[Index, NamedArg(XarrayZarrRecipe, 'config')], None]"; | ||
# expected "NoArgumentStageFunction" [arg-type] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really appreciate some type-hinting advice here. I am trying to define a specific type for these stage functions using a protocol. But mypy is not recognizing my functions as valid implementations of the protocol, instead identifying them as Callable[[Index, NamedArg(XarrayZarrRecipe, 'config')], None]
.
This looks like it could be a mypy bug: python/mypy#8317
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, instead of Hashable
, you want to use a bounded TypeVar? e.g.
H = TypeVar('H', bound=Hashable)
# ...
class SingleArgumentStageFunction(Protocol):
def __call__(__a: H, *, config: Optional[Config] = None) -> None:
...
See these Generics docs in MyPy: https://mypy.readthedocs.io/en/latest/generics.html#type-variables-with-upper-bounds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought about it more, and I think that Hashable
may be fine. It could be the Config=Any
that's causing the type check issue. TypeVars (bounded, value restricted, or standard) could be helpful here, too.
Hi Folks! I have just pushed a big update that (I hope) fully resolves all remaining issues. I have updated my top-level comment in #219 (comment) and am looking for reviews again. Also note the mypy question just above. |
Looks good to me at a quick glance. Calling I don't have any immediate guesses on the typing issue. |
The answer is an unqualified "yes" |
@@ -21,8 +23,83 @@ def no_op(*_, **__) -> None: | |||
return None | |||
|
|||
|
|||
def scan_file(chunk_key: ChunkKey, config: HDFReferenceRecipe): | |||
assert config.metadata_cache is not None, "metadata_cache is required" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been told that assert
should not be used outside of tests. Since we should verify values before starting a recipe, it seems superfluous to check again in these work functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are required to make mypy pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When mypy requires you to write extra lines of code, I stop using mypy... :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be possible to configure execution to not include any assert statements via an -O
flag. Maybe, after investigating this option, we can document a pathway for more performant runs?
@@ -58,6 +135,8 @@ class HDFReferenceRecipe(BaseRecipe, FilePatternRecipeMixin): | |||
Only used if `file_pattern` has more than one file. | |||
""" | |||
|
|||
_compiler = hdf_reference_recipe_compiler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised that this is a function, feels like you could more simply include the pipeline stages right here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would result in circular calls, since generating the pipeline requires iterating over items in the Recipe itself. The function takes the Recipe as an input and returns a pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I was not suggesting a change here, only a vague misgiving that it might be confusing to others. Perhaps a comment simply saying that this is a function might help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do much better than comments! We have detailed type hints.
pangeo-forge-recipes/pangeo_forge_recipes/recipes/base.py
Lines 11 to 13 in 3877eb1
@dataclass | |
class BaseRecipe(ABC): | |
_compiler: ClassVar[RecipeCompiler] |
RecipeCompiler = Callable[[BaseRecipe], Pipeline] |
🥳
I left a couple of guests to address the typing issue. I'm working on a tightish deadline right now, but if I find some time, I'll pull this branch locally and see if I can find a solution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that now this PR does not touch the API! 🎉
Awesome!
Is the refactored pangeo_forge_recipes/recipes/xarray_zarr.py more understandable, readable and maintainable?
Yes, this PR represents a considerable improvement in all three categories. Re: understandability, it took me a moment to grok how the stages are actually added (via _compiler
+ BaseRecipe
methods), but once I did, I really appreciated the clarity of it, in particular the Pipeline
+ Stage
objects and their relationship.
This PR makes me wonder if we'll want to rewrite Manual Execution docs to be more generalized. I played around a bit with what that might look like in this Gist (which shares a lot with FunctionPipelineExecutor
, of course).
The limitation of the approach in that Gist is that it accesses the inner _compiler
attribute. Maybe this means we'll want to have a convenience function somewhere to surface the stages for manual execution? (Which I do believe is good to support for learning and debugging purposes.) Perhaps this would be via a ManualPipelineExecutor
class and corresponding BaseRecipe.to_manual_stages
method?
My curiosity here is wondering how someone wanting to manually debug an arbitrary recipe class can easily access the individual stages of that recipe.
This is a very important point Charles. I'm going to think about the best way to bring back that convenient introspection. |
I ended up reimplementing the methods (e.g. I also solved the mypy issues by using the (deprecated SingleArgumentStageFunction = Callable[[Any, NamedArg(type=Any, name="config")], None] Merging this now. |
hopefully...
Edited on 13 Oct - Note that now this PR does not touch the API! 🎉 As a result, all existing recipe tests pass as-is. It's a purely internal change in implementation.
How things used to be
Recipes were implemented as dataclasses, with each stage of execution as an attribute on the dataclass which returned a partial-enclosed function. To generate these partials, we had to pass tons of keyword arguments around. This created lots of room for error. And passing the
FilePattern
object via partials to each task in a large graph created major memory problems.The new way
Recipes are still defined as dataclasses. However, this dataclasss doubles as a
config
parameter which is passed to the actual functions that do the work for the recipe. These functions have a simple signature, like this.pangeo-forge-recipes/pangeo_forge_recipes/recipes/xarray_zarr.py
Lines 141 to 144 in 3877eb1
In order to translate the recipe into something executable, recipes now must assign a function to the
_compiler
class variable. This is what the compiler function looks like for XarrayZarrRecipepangeo-forge-recipes/pangeo_forge_recipes/recipes/xarray_zarr.py
Lines 523 to 530 in 3877eb1
In the process of making this work, I have also re-implemented the Pipelines execution model from rechunker from scratch, incorporating lessons learned. So this PR subsumes #192.
This PR also includes the new memory test from #220, verifying that this new implementation does not blow up the memory of dask graphs.
How to review
This is a huge refactoring, which makes it intimidating to review. Rather than just saying "everyone review everything", here are some specific review questions for specific people:
pangeo_forge_recipes/executors/base.py
sensible and well designed? @alxmrspangeo_forge_recipes/executors/dask.py
aligned with best practices? Do you see anything that could go wrong? @TomAugspurgerpangeo_forge_recipes/recipes/reference_hdf_zarr.py
more understandable, readable and maintainable? @martindurantpangeo_forge_recipes/recipes/xarray_zarr.py
more understandable, readable and maintainable? @cisaacstern