Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak file opening #213

Merged
merged 7 commits into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions pangeo_forge_recipes/recipes/reference_hdf_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import fsspec
import yaml
from fsspec_reference_maker.combine import MultiZarrToZarr
from fsspec_reference_maker.hdf import SingleHdf5ToZarr

from ..patterns import FilePattern, Index
from ..storage import FSSpecTarget, MetadataTarget
from ..reference import create_hdf5_reference, unstrip_protocol
from ..storage import FSSpecTarget, MetadataTarget, file_opener
from .base import BaseRecipe, FilePatternRecipeMixin

ChunkKey = Index
Expand Down Expand Up @@ -150,10 +150,11 @@ def _one_chunk(
metadata_cache: MetadataTarget,
):
fname = file_pattern[chunk_key]
with fsspec.open(fname, **netcdf_storage_options) as f:
fn = os.path.basename(fname + ".json")
h5chunks = SingleHdf5ToZarr(f, _unstrip_protocol(fname, f.fs), inline_threshold=300)
metadata_cache[fn] = h5chunks.translate()
ref_fname = os.path.basename(fname + ".json")
with file_opener(fname, **netcdf_storage_options) as fp:
protocol = getattr(getattr(fp, "fs", None), "protocol", None) # make mypy happy
target_url = unstrip_protocol(fname, protocol)
metadata_cache[ref_fname] = create_hdf5_reference(fp, target_url, fname)


def _finalize(
Expand Down Expand Up @@ -212,16 +213,3 @@ def _finalize(
}
with out_target.open(output_intake_yaml_fname, mode="wt") as f:
yaml.dump(spec, f, default_flow_style=False)


def _unstrip_protocol(name, fs):
# should be upstreamed into fsspec and maybe also
# be a method on an OpenFile
if isinstance(fs.protocol, str):
if name.startswith(fs.protocol):
return name
return fs.protocol + "://" + name
else:
if name.startswith(tuple(fs.protocol)):
return name
return fs.protocol[0] + "://" + name
28 changes: 28 additions & 0 deletions pangeo_forge_recipes/reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Functions related to creating fsspec references.
"""

from typing import Dict, Tuple, Union

from fsspec_reference_maker.hdf import SingleHdf5ToZarr


def create_hdf5_reference(
fp, fname: str, url: str, inline_threshold: int = 300, **netcdf_storage_options
) -> Dict:
h5chunks = SingleHdf5ToZarr(fp, url, inline_threshold=inline_threshold)
reference_data = h5chunks.translate()
return reference_data


def unstrip_protocol(name: str, protocol: Union[str, Tuple[str, ...]]) -> str:
# should be upstreamed into fsspec and maybe also
# be a method on an OpenFile
if isinstance(protocol, str):
if name.startswith(protocol):
return name
return protocol + "://" + name
else:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martindurant - could you explain this else block? Under what circumstances would protocol not be a string?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be a tuple of strings, like ("gcs", "gs")

if name.startswith(tuple(protocol)):
return name
return protocol[0] + "://" + name
18 changes: 12 additions & 6 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

# fsspec doesn't provide type hints, so I'm not sure what the write type is for open files
OpenFileType = Any
# https://github.com/pangeo-forge/pangeo-forge-recipes/pull/213#discussion_r717801623
# There is no fool-proof method to tell whether the output of the context was created by fsspec.
# You could check for the few concrete classes that we expect
# like AbstractBufferedFile, LocalFileOpener.


def _get_url_size(fname, secrets, **open_kwargs):
Expand Down Expand Up @@ -116,14 +120,15 @@ def size(self, path: str) -> int:
return self.fs.size(self._full_path(path))

@contextmanager
def open(self, path: str, **kwargs) -> Iterator[None]:
def open(self, path: str, **kwargs) -> Iterator[OpenFileType]:
"""Open file with a context manager."""
full_path = self._full_path(path)
logger.debug(f"entering fs.open context manager for {full_path}")
with self.fs.open(full_path, **kwargs) as f:
logger.debug(f"FSSpecTarget.open yielding {f}")
yield f
logger.debug("FSSpecTarget.open yielded")
of = self.fs.open(full_path, **kwargs)
logger.debug(f"FSSpecTarget.open yielding {of}")
yield of
logger.debug("FSSpecTarget.open yielded")
of.close()
Comment on lines +127 to +131
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a subtle but important change. We used to do

with self.fs.open(full_path, **kwargs) as f:
    yield f  # -> _io.BufferedReader

Now we basically do

yield self.fs.open(full_path, **kwargs)  # -> fsspec.core.OpenFile

This passes all our tests, but this is a sensitive area of the code.

@martindurant, can you think of anything that could go wrong here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: self.fs.open returns a file-like object (io.IOBase subclass, AbstractBufferedFile for s3, gcs, etc.)

For an AbstractBufferedFile, entering the context is a no-op yielding the same object, and exiting calls .close(). For other file types, it's something else, with the expectation that exiting the context closes. For example, LocalFileSystem.open -> LocalFileOpener ; LocalFileSystem.enter -> _io.BufferedReader (and exit calls the actual file's exit, calling close()). You could do with on the file-like.

For fsspec.open, you get OpenFile objects, which can contain more than one file-like layer, for compression and text mode. You must use this with a context, to get the outermost file-like object back. Leaving the context calls close on the file-like objects in reverse order. If you, instead, call .open of the OpenFile, you get the same file-like object, but its .close method has been patched to execute the same chain of close calls, since this object might in this case outlive the OpenFile instance that made it.


def __post_init__(self):
if not self.fs.isdir(self.root_path):
Expand Down Expand Up @@ -189,7 +194,7 @@ def file_opener(
bypass_open: bool = False,
secrets: Optional[dict] = None,
**open_kwargs,
) -> Iterator[Union[OpenFileType, str]]:
) -> Iterator[Union[fsspec.core.OpenFile, str]]:
"""
Context manager for opening files.

Expand All @@ -201,6 +206,7 @@ def file_opener(
before opening. In this case, function yields a path name rather than an open file.
:param bypass_open: If True, skip trying to open the file at all and just
return the filename back directly. (A fancy way of doing nothing!)
:param secrets: Dictionary of secrets to encode into the query string.
"""

if bypass_open:
Expand Down
1 change: 1 addition & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def do_actual_test():
ds.load()
else:
_ = fp.read()
assert hasattr(fp, "fs") # should be true for fsspec.OpenFile objects

if use_dask:
with Client(dask_cluster) as client:
Expand Down