diff --git a/pangeo_forge_recipes/recipes/reference_hdf_zarr.py b/pangeo_forge_recipes/recipes/reference_hdf_zarr.py index f1ef7c5c..c8842238 100644 --- a/pangeo_forge_recipes/recipes/reference_hdf_zarr.py +++ b/pangeo_forge_recipes/recipes/reference_hdf_zarr.py @@ -7,10 +7,10 @@ import fsspec import yaml from fsspec_reference_maker.combine import MultiZarrToZarr -from fsspec_reference_maker.hdf import SingleHdf5ToZarr from ..patterns import FilePattern, Index -from ..storage import FSSpecTarget, MetadataTarget +from ..reference import create_hdf5_reference, unstrip_protocol +from ..storage import FSSpecTarget, MetadataTarget, file_opener from .base import BaseRecipe, FilePatternRecipeMixin ChunkKey = Index @@ -150,10 +150,11 @@ def _one_chunk( metadata_cache: MetadataTarget, ): fname = file_pattern[chunk_key] - with fsspec.open(fname, **netcdf_storage_options) as f: - fn = os.path.basename(fname + ".json") - h5chunks = SingleHdf5ToZarr(f, _unstrip_protocol(fname, f.fs), inline_threshold=300) - metadata_cache[fn] = h5chunks.translate() + ref_fname = os.path.basename(fname + ".json") + with file_opener(fname, **netcdf_storage_options) as fp: + protocol = getattr(getattr(fp, "fs", None), "protocol", None) # make mypy happy + target_url = unstrip_protocol(fname, protocol) + metadata_cache[ref_fname] = create_hdf5_reference(fp, target_url, fname) def _finalize( @@ -212,16 +213,3 @@ def _finalize( } with out_target.open(output_intake_yaml_fname, mode="wt") as f: yaml.dump(spec, f, default_flow_style=False) - - -def _unstrip_protocol(name, fs): - # should be upstreamed into fsspec and maybe also - # be a method on an OpenFile - if isinstance(fs.protocol, str): - if name.startswith(fs.protocol): - return name - return fs.protocol + "://" + name - else: - if name.startswith(tuple(fs.protocol)): - return name - return fs.protocol[0] + "://" + name diff --git a/pangeo_forge_recipes/reference.py b/pangeo_forge_recipes/reference.py new file mode 100644 index 00000000..168359a4 --- /dev/null +++ b/pangeo_forge_recipes/reference.py @@ -0,0 +1,28 @@ +""" +Functions related to creating fsspec references. +""" + +from typing import Dict, Tuple, Union + +from fsspec_reference_maker.hdf import SingleHdf5ToZarr + + +def create_hdf5_reference( + fp, fname: str, url: str, inline_threshold: int = 300, **netcdf_storage_options +) -> Dict: + h5chunks = SingleHdf5ToZarr(fp, url, inline_threshold=inline_threshold) + reference_data = h5chunks.translate() + return reference_data + + +def unstrip_protocol(name: str, protocol: Union[str, Tuple[str, ...]]) -> str: + # should be upstreamed into fsspec and maybe also + # be a method on an OpenFile + if isinstance(protocol, str): + if name.startswith(protocol): + return name + return protocol + "://" + name + else: + if name.startswith(tuple(protocol)): + return name + return protocol[0] + "://" + name diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index 90ae0532..4b6e4359 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -19,6 +19,10 @@ # fsspec doesn't provide type hints, so I'm not sure what the write type is for open files OpenFileType = Any +# https://github.com/pangeo-forge/pangeo-forge-recipes/pull/213#discussion_r717801623 +# There is no fool-proof method to tell whether the output of the context was created by fsspec. +# You could check for the few concrete classes that we expect +# like AbstractBufferedFile, LocalFileOpener. def _get_url_size(fname, secrets, **open_kwargs): @@ -116,14 +120,15 @@ def size(self, path: str) -> int: return self.fs.size(self._full_path(path)) @contextmanager - def open(self, path: str, **kwargs) -> Iterator[None]: + def open(self, path: str, **kwargs) -> Iterator[OpenFileType]: """Open file with a context manager.""" full_path = self._full_path(path) logger.debug(f"entering fs.open context manager for {full_path}") - with self.fs.open(full_path, **kwargs) as f: - logger.debug(f"FSSpecTarget.open yielding {f}") - yield f - logger.debug("FSSpecTarget.open yielded") + of = self.fs.open(full_path, **kwargs) + logger.debug(f"FSSpecTarget.open yielding {of}") + yield of + logger.debug("FSSpecTarget.open yielded") + of.close() def __post_init__(self): if not self.fs.isdir(self.root_path): @@ -189,7 +194,7 @@ def file_opener( bypass_open: bool = False, secrets: Optional[dict] = None, **open_kwargs, -) -> Iterator[Union[OpenFileType, str]]: +) -> Iterator[Union[fsspec.core.OpenFile, str]]: """ Context manager for opening files. @@ -201,6 +206,7 @@ def file_opener( before opening. In this case, function yields a path name rather than an open file. :param bypass_open: If True, skip trying to open the file at all and just return the filename back directly. (A fancy way of doing nothing!) + :param secrets: Dictionary of secrets to encode into the query string. """ if bypass_open: diff --git a/tests/test_storage.py b/tests/test_storage.py index c620615a..e1548405 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -98,6 +98,7 @@ def do_actual_test(): ds.load() else: _ = fp.read() + assert hasattr(fp, "fs") # should be true for fsspec.OpenFile objects if use_dask: with Client(dask_cluster) as client: