Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor executors #77

Merged
merged 23 commits into from
Jan 27, 2021
Merged

Conversation

rabernat
Copy link
Member

Overview

This is a major refactor of the internals of rechunker. I have changed the interface between the executors and the rechunking plan by adding some new types. The executors all now accept something called a ParallelPipelines object. The hierarchy of types looks like this

ParallelPipelines = Iterable[MultiStagePipeline]
MultiStagePipeline = Iterable[Stage]
class Stage(NamedTuple):
    func: Callable
    map_args: Optional[Iterable] = None

Stages contain a single function that is mapped across many inputs (e.g. a single copy operation). MultiStagePipelienes contain multiple Stages (e.g. copy source to intermediate, then intermediate to target). ParallelPipelines contain several MultiStagePipelines that can be executed in parallel.

The rechunk function now contains a line called pipelines = specs_to_pipelines(copy_spec) which translates a list of CopySpecs to a ParallelPipelines. This function contains all of the logic about how to execute a copy operation. All the executors needs to do is know how to execute a generic ParallelPipelines.

Motivation

The abstractions we have created in rechunker, which allow many different distributed execution engines to be used for the same computation, are very useful and cool. The underlying motivation for this refactor was to make the executors more general, such that they can be used in other projects (e.g. Pangeo Forge). This is accomplished by decoupling the details of the rechunking operation as currently implemented from the Executor class.

Pros

Beyond the motivation above, this approach has a couple of major benefits:

Cons

But there are some downsides:

  • Additional code complexity (more levels of abstraction)
  • Possible performance impacts (have not done benchmarking comparison yet)
  • Possible impacts on graph size / complexity (have not checked yet)

Todo

  • Python Executor
  • Dask Exectuor
  • Prefect Exectuor
  • Beam Exector
  • PyWren Executor
  • Additional tests?
  • Documentation

At this point I would love to get a preliminary review from anyone who is interested.

@rabernat rabernat marked this pull request as draft December 31, 2020 23:03
@shoyer
Copy link
Collaborator

shoyer commented Jan 1, 2021

I like this general idea! My main concern is that this would preclude the option to avoid using intermediate arrays in favor of executor-native groupby operations, e.g., like what is sketched out in #36 for Beam. In principle, avoiding the intermediate copy could be up to twice as fast, if an executor like Beam or Spark manages to hold all the intermediate values in memory instead of dumping to disk.

@rabernat
Copy link
Member Author

rabernat commented Jan 2, 2021

Good point Stephan. I think there is a simple resolution; we make the CopySpec -> Pipeline translation optional and allow executors to natively work on CopySpecs if they prefer. Stand by for an update to implement this.

@shoyer
Copy link
Collaborator

shoyer commented Jan 2, 2021

Good point Stephan. I think there is a simple resolution; we make the CopySpec -> Pipeline translation optional and allow executors to natively work on CopySpecs if they prefer. Stand by for an update to implement this.

Sounds good to me!

@codecov
Copy link

codecov bot commented Jan 7, 2021

Codecov Report

Merging #77 (a5a3a29) into master (c59f303) will decrease coverage by 0.31%.
The diff coverage is 98.13%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #77      +/-   ##
==========================================
- Coverage   97.76%   97.44%   -0.32%     
==========================================
  Files          10       12       +2     
  Lines         447      509      +62     
  Branches       89       93       +4     
==========================================
+ Hits          437      496      +59     
- Misses          5        7       +2     
- Partials        5        6       +1     
Impacted Files Coverage Δ
rechunker/executors/util.py 100.00% <ø> (ø)
rechunker/executors/dask.py 94.54% <94.00%> (-5.46%) ⬇️
rechunker/algorithm.py 82.45% <100.00%> (ø)
rechunker/api.py 100.00% <100.00%> (ø)
rechunker/compat.py 100.00% <100.00%> (ø)
rechunker/executors/__init__.py 100.00% <100.00%> (ø)
rechunker/executors/beam.py 100.00% <100.00%> (ø)
rechunker/executors/prefect.py 100.00% <100.00%> (ø)
rechunker/executors/python.py 100.00% <100.00%> (ø)
rechunker/executors/pywren.py 100.00% <100.00%> (ø)
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c59f303...a5a3a29. Read the comment docs.

@rabernat
Copy link
Member Author

For context, the tests are currently failing due to the pre-commit mypy failure described above. That's the only blocker here.

@rabernat rabernat marked this pull request as ready for review January 18, 2021 14:41
@rabernat
Copy link
Member Author

I just put this through its paces on google cloud, and I'm satisfied it is working ok for real world use cases. I'm going to merge. It would be great if other users (maybe @rsignell-usgs?) could take this for a spin by installing rechunker from github master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants