-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing FilePatternToChunks: IO with Pangeo-Forge's FilePattern interface. #31
Conversation
…nterface. This if the first of a few changes that will let users read in datasets using Pangeo-Forge's `FilePattern` interface [0]. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks (and optionally, smaller `sub_chunks`). This module can be leveraged in pipelines to convert natively formatted datasets to Zarr. To make use of this transform, the user will need to install `pangeo-forge-recipes` separately. This dependency is included in the test dependencies. As on now, this transform is not exposed to the user (i.e., not included in the primary `__init__.py`). I plan to do this (and update the docs) once the module is tested and feature complete (google#29). [0]: https://pangeo-forge.readthedocs.io/en/latest/file_patterns.html
xarray_beam/_src/pangeo.py
Outdated
) -> Iterator[Tuple[core.Key, xarray.Dataset]]: | ||
"""Open datasets into chunks with XArray.""" | ||
path = self.pattern[index] | ||
with FileSystems().open(path) as file: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do Beam's filesystems really all work out of the box with Xarray? If so, that's awesome!
Can you verify that it works with both netCDF3 and netCDF4 files? These would be using different underlying storage backends (scipy vs h5netcdf).
To be honest, I'm a little skeptical that this will work well. I suspect we'll end up up needing to copy temporary files to local disk (but I'd love to be proven wrong!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me experiment and see how this works. In my tests in the previous iteration of this change, this worked well with GCS's IO objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I experimented a bit more with this based on @mjwillson's suggestion.
Amazingly, it seems that uses file-like objects in Xarray does actually work as used here, though making a local copy might still have better performance.
What doesn't work yet -- but hopefully with small upstream changes to Xarray could work -- is passing xarray datasets opened with these file-like objects into a Beam pipeilne. That could let us do the actual data loading from netCDF in separate workers, which could be quite a win!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit unclear to me how this would not work in a Beam pipeline (or, what needs to be done to get this win). Can you explain a bit more?
Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?
With this change, we could pickle lazy xarray.Dataset
objects corresponding to open netCDF files and pass them between stages in in a Beam pipeline.
Some data would still need to get loaded on worker on which xarray.open_dataset()
is called, but this could be much less data than the entire file (e.g., only the "metadata" part of the file). The bulk of the loading work could be split across multiple workers, which could be quite useful for processing large (GB+) netCDF files.
Code is passing tests; cleanup and e2e testing is still needed.
Added comment to explain early return.
…nks. This transform will now only open file pattern datasets as whole chunks. Re-chunk (i.e. "sub_chunk"s) can be delegated to a SplitChunk() transform layered after this one.
As a backup to the `FileSystems().open(...)` method, we use fsspec to create a local copy of the data for opening with `xr.open_dataset(...)`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, thanks Alex!
xarray_beam/_src/pangeo_forge.py
Outdated
try: | ||
yield xarray.open_dataset(file, **self.xarray_open_kwargs) | ||
except (TypeError, OSError) as e: | ||
|
||
if not self.local_copy: | ||
raise ValueError(f'cannot open {path!r} with buffering.') from e | ||
|
||
# The cfgrib engine (and others) may fail with the FileSystems method of | ||
# opening with BufferedReaders. Here, we open the data locally to make | ||
# it easier to work with XArray. | ||
with fsspec.open_local( | ||
f"simplecache::{path}", | ||
simplecache={'cache_storage': '/tmp/files'} | ||
) as fs_file: | ||
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using local_copy
as a fall-back, can we just use an if
statement?
try: | |
yield xarray.open_dataset(file, **self.xarray_open_kwargs) | |
except (TypeError, OSError) as e: | |
if not self.local_copy: | |
raise ValueError(f'cannot open {path!r} with buffering.') from e | |
# The cfgrib engine (and others) may fail with the FileSystems method of | |
# opening with BufferedReaders. Here, we open the data locally to make | |
# it easier to work with XArray. | |
with fsspec.open_local( | |
f"simplecache::{path}", | |
simplecache={'cache_storage': '/tmp/files'} | |
) as fs_file: | |
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) | |
if self.local_copy: | |
# The cfgrib engine (and others) may fail with the FileSystems method of | |
# opening with BufferedReaders. Here, we open the data locally to make | |
# it easier to work with XArray. | |
with fsspec.open_local( | |
f"simplecache::{path}", | |
simplecache={'cache_storage': '/tmp/files'} | |
) as fs_file: | |
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs) | |
else: | |
yield xarray.open_dataset(file, **self.xarray_open_kwargs) |
The old contextmanager approach wasn't applicable, since `open_local` returns a string (path to the open file).
This if the first of a few changes that will let users read in datasets using Pangeo-Forge's
FilePattern
interface 0. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks. This module can be leveraged in pipelines to convert natively formatted datasets to Zarr.To make use of this transform, the user will need to install
pangeo-forge-recipes
separately. This dependency is included in the test dependencies.As of now, this transform is not exposed to the user (i.e., not included in the primary
__init__.py
). I plan to do this (and update the docs) once the module is tested and feature complete (#29).