Skip to content

Commit

Permalink
tokenize config in dask executor
Browse files Browse the repository at this point in the history
  • Loading branch information
rabernat committed Oct 4, 2021
1 parent 40a5fa9 commit 65a7d57
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions pangeo_forge_recipes/recipes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,20 @@ def to_dask(self):
dsk = {}
token = dask.base.tokenize(self)

config_ = f"config-{token}"
dsk[config_] = self.config

# TODO: allow recipes to customize which stages to run
for i, input_key in enumerate(self.iter_inputs()):
dsk[(f"cache_input-{token}", i)] = (self.cache_input, self.config, input_key)
dsk[(f"cache_input-{token}", i)] = (self.cache_input, config_, input_key)

# Prepare Target ------------------------------------------------------
dsk[f"checkpoint_0-{token}"] = (lambda *args: None, list(dsk))
dsk[f"prepare_target-{token}"] = (
_checkpoint,
f"checkpoint_0-{token}",
self.prepare_target,
self.config,
config_,
)

# Store Chunk --------------------------------------------------------
Expand All @@ -133,15 +136,15 @@ def to_dask(self):
_checkpoint,
f"prepare_target-{token}",
self.store_chunk,
self.config,
config_,
chunk_key,
)
keys.append(k)

# Finalize Target -----------------------------------------------------
dsk[f"checkpoint_1-{token}"] = (lambda *args: None, keys)
key = f"finalize_target-{token}"
dsk[key] = (_checkpoint, f"checkpoint_1-{token}", self.finalize_target, self.config)
dsk[key] = (_checkpoint, f"checkpoint_1-{token}", self.finalize_target, config_)

return Delayed(key, dsk)

Expand Down

0 comments on commit 65a7d57

Please sign in to comment.