Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow fsspec/zarr/mfdataset #4461

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,38 @@ store is already present at that path, an error will be raised, preventing it
from being overwritten. To override this behavior and overwrite an existing
store, add ``mode='w'`` when invoking :py:meth:`~Dataset.to_zarr`.

It is also possible to append to an existing store. For that, set
``append_dim`` to the name of the dimension along which to append. ``mode``
can be omitted as it will internally be set to ``'a'``.

.. ipython:: python
:suppress:

! rm -rf path/to/directory.zarr

.. ipython:: python
:okexcept:

ds1 = xr.Dataset(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad merge? This makes the docs build fail with a SyntaxError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting. Correcting...

{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
coords={
"x": [10, 20, 30, 40],
"y": [1, 2, 3, 4, 5],
"t": pd.date_range("2001-01-01", periods=2),
},
)
ds1.to_zarr("path/to/directory.zarr")
ds2 = xr.Dataset(
{"foo": (("x", "y", "t"), np.random.rand(4, 5, 2))},
coords={
"x": [10, 20, 30, 40],
"y": [1, 2, 3, 4, 5],
"t": pd.date_range("2001-01-03", periods=2),
},
)
ds2.to_zarr("path/to/directory.zarr", append_dim="t")


To store variable length strings, convert them to object arrays first with
``dtype=object``.

Expand All @@ -890,8 +922,28 @@ Cloud Storage Buckets

It is possible to read and write xarray datasets directly from / to cloud
storage buckets using zarr. This example uses the `gcsfs`_ package to provide
a ``MutableMapping`` interface to `Google Cloud Storage`_, which we can then
pass to xarray::
an interface to `Google Cloud Storage`_.

From v0.16.2: general `fsspec`_ URLs are parsed and the store set up for you
automatically when reading, such that you can open a dataset in a single
call. You should include any arguments to the storage backend as the
key ``storage_options``, part of ``backend_kwargs``.

.. code:: python

ds_gcs = xr.open_dataset(
"gcs://<bucket-name>/path.zarr",
backend_kwargs={"storage_options": {"project": '<project-name>', "token": None}},
engine="zarr"
)

This also works with ``open_mfdataset``, allowing you to pass a list of paths or
a URL to be interpreted as a glob string.

For older versions, and for writing, you must explicitly set up a ``MutableMapping``
instance and pass this, as follows:

.. code:: python

import gcsfs
fs = gcsfs.GCSFileSystem(project='<project-name>', token=None)
Expand All @@ -901,6 +953,9 @@ pass to xarray::
# read it back
ds_gcs = xr.open_zarr(gcsmap)

(or use the utility function ``fsspec.get_mapper()``).

.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/
.. _Zarr: http://zarr.readthedocs.io/
.. _Amazon S3: https://aws.amazon.com/s3/
.. _Google Cloud Storage: https://cloud.google.com/storage/
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ New Features
``CFTimeIndex.repr``. (:issue:`2416`, :pull:`4597`)
By `Aaron Spring <https://github.com/aaronspring>`_.

- :py:func:`open_dataset` and :py:func:`open_mfdataset` now accept ``fsspec`` URLs
(including globs for the latter) for ``engine="zarr"``, and so allow reading from
many remote and other file systems (:pull:`4461`)
By `Martin Durant <https://github.com/martindurant>`_

Bug fixes
~~~~~~~~~

Expand Down
46 changes: 37 additions & 9 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def open_dataset(
ends with .gz, in which case the file is gunzipped and opened with
scipy.io.netcdf (only netCDF3 supported). Byte-strings or file-like
objects are opened by scipy.io.netcdf (netCDF3) or h5py (netCDF4/HDF).
Also supports arbitrary ``fsspec`` URLs, only for the "zarr" backend.
group : str, optional
Path to the netCDF4 group in the given file to open (only works for
netCDF4 files).
Expand Down Expand Up @@ -400,7 +401,10 @@ def open_dataset(
backend_kwargs: dict, optional
A dictionary of keyword arguments to pass on to the backend. This
may be useful when backend options would improve performance or
allow user control of dataset processing.
allow user control of dataset processing. When using an ``fsspec``
path for the filename, they key ``storage_options`` can be used
here to configure the backend storage instance. Alternatively, a
pre-configured file instance can be supplied with key ``fs``.
use_cftime: bool, optional
Only relevant if encoded dates come from a standard calendar
(e.g. "gregorian", "proleptic_gregorian", "standard", or not
Expand Down Expand Up @@ -548,11 +552,14 @@ def maybe_decode_store(store, chunks):
ds2._file_obj = ds._file_obj
return ds2

filename_or_obj = _normalize_path(filename_or_obj)
fs = backend_kwargs.get("fs", None)
if fs is None:
filename_or_obj = _normalize_path(filename_or_obj)

if isinstance(filename_or_obj, AbstractDataStore):
store = filename_or_obj
else:
backend_kwargs = backend_kwargs.copy()
if engine is None:
engine = _autodetect_engine(filename_or_obj)

Expand All @@ -564,9 +571,15 @@ def maybe_decode_store(store, chunks):

if engine == "zarr":
backend_kwargs = backend_kwargs.copy()
backend_kwargs.pop("fs", None)
overwrite_encoded_chunks = backend_kwargs.pop(
"overwrite_encoded_chunks", None
)
extra_kwargs["mode"] = "r"
extra_kwargs["group"] = group
if fs is not None:
filename_or_obj = fs.get_mapper(filename_or_obj)
Comment on lines +580 to +581
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding the fs keyword argument, why not just encourage passing in an appropriate mapping for filename_or_obj?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works already, and will continue to work. However, the whole point. of this PR is to allow working out those details in a single call to open_dataset, which turns out very convenient for encoding in an Intake catalog, for instance, or indeed for the open_mfdataset implementation in here.

Copy link
Collaborator

@alexamici alexamici Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we are working on refactor of the backend API that among other things aims at removing all knowledge of what backends can or can't do from open_dataset. Adding logic inside if engine == "zarr" will probably result in merge conflicts.

I would suggest to move the call to fs.get_mapper(filename_or_obj) inside the zarr backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads up. I already did one slightly complex conflict resolve.

It isn't totally clear, though, that the logic can be buried in the zarr engine for two reasons:

  • when using open_mf, the globbing of remote files/directories happens early, before establishing individual zarr instances
  • actually the file instances that fsspec makes from URLs can be used by some other backends; that just happens not the be the emphasis here

Happy to go whichever way is most convenient for the library.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to resolve this discussion in order to decide what to do about this PR. Any more thoughts from other devs.

In my view, some of the fsspec logic introduced in the PR should eventually move to the generic open_mfdataset function, as it is not really specific to Zarr. However, I don't see a strong downside to adding it to open_zarr right now. Eventually open_zarr will be deprecated. But the pattern used here could be copied and incorporated into the backend refactor.

backend_kwargs.pop("storage_options", None)

opener = _get_backend_cls(engine)
store = opener(filename_or_obj, **extra_kwargs, **backend_kwargs)
Expand Down Expand Up @@ -786,7 +799,8 @@ def open_mfdataset(
files to open. Paths can be given as strings or as pathlib Paths. If
concatenation along more than one dimension is desired, then ``paths`` must be a
nested list-of-lists (see ``combine_nested`` for details). (A string glob will
be expanded to a 1-dimensional list.)
be expanded to a 1-dimensional list.). When engine=="zarr", the path(s) can
be of any type understood by ``fsspec``.
chunks : int or dict, optional
Dictionary with keys given by dimension names and values given by chunk sizes.
In general, these should divide the dimensions of each dataset. If int, chunk
Expand Down Expand Up @@ -907,13 +921,27 @@ def open_mfdataset(
"""
if isinstance(paths, str):
if is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
if engine != "zarr":
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs: "
"{!r}. Instead, supply paths as an explicit list of strings.".format(
paths
)
)
)
paths = sorted(glob(paths))
else:
import fsspec # type: ignore

backend_kwargs = kwargs.get("backend_kwargs", {})
storage_options = backend_kwargs.get("storage_options", None)

fs, _, _ = fsspec.core.get_fs_token_paths(
paths, storage_options=storage_options
)
paths = fs.expand_path(paths)
backend_kwargs["fs"] = fs
kwargs["backend_kwargs"] = backend_kwargs
else:
paths = sorted(glob(paths))
else:
paths = [str(p) if isinstance(p, Path) else p for p in paths]

Expand Down
4 changes: 3 additions & 1 deletion xarray/backends/apiv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ def open_dataset(
if backend_kwargs is None:
backend_kwargs = {}

filename_or_obj = _normalize_path(filename_or_obj)
if "fs" not in backend_kwargs:
# do *not* mange paths meant for a specific file system made in open_mfdataset
filename_or_obj = _normalize_path(filename_or_obj)

if engine is None:
engine = _autodetect_engine(filename_or_obj)
Expand Down
7 changes: 7 additions & 0 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def open_group(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
storage_options=None,
append_dim=None,
write_region=None,
):
Expand All @@ -289,6 +290,8 @@ def open_group(
if chunk_store:
open_kwargs["chunk_store"] = chunk_store

if storage_options:
open_kwargs["storage_options"] = storage_options
if consolidated:
# TODO: an option to pass the metadata_key keyword
zarr_group = zarr.open_consolidated(store, **open_kwargs)
Expand Down Expand Up @@ -706,8 +709,12 @@ def open_backend_dataset_zarr(
consolidated=False,
consolidate_on_close=False,
chunk_store=None,
fs=None,
):

if fs is not None:
filename_or_obj = fs.get_mapper(filename_or_obj)

store = ZarrStore.open_group(
filename_or_obj,
group=group,
Expand Down
2 changes: 1 addition & 1 deletion xarray/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ def close_on_error(f):


def is_remote_uri(path: str) -> bool:
return bool(re.search(r"^https?\://", path))
return bool(re.search(r"^[a-z][a-z0-9]*(\://|\:\:)", path))


def is_grib_path(path: str) -> bool:
Expand Down
1 change: 1 addition & 0 deletions xarray/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def LooseVersion(vstring):
has_nc_time_axis, requires_nc_time_axis = _importorskip("nc_time_axis")
has_rasterio, requires_rasterio = _importorskip("rasterio")
has_zarr, requires_zarr = _importorskip("zarr")
has_fsspec, requires_fsspec = _importorskip("fsspec")
has_iris, requires_iris = _importorskip("iris")
has_cfgrib, requires_cfgrib = _importorskip("cfgrib")
has_numbagg, requires_numbagg = _importorskip("numbagg")
Expand Down
31 changes: 31 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
requires_cfgrib,
requires_cftime,
requires_dask,
requires_fsspec,
requires_h5netcdf,
requires_netCDF4,
requires_pseudonetcdf,
Expand Down Expand Up @@ -4784,6 +4785,36 @@ def test_extract_zarr_variable_encoding():
)


@requires_zarr
@requires_fsspec
def test_open_fsspec():
import fsspec # type: ignore
import zarr

if not hasattr(zarr.storage, "FSStore") or not hasattr(
zarr.storage.FSStore, "getitems"
):
pytest.skip("zarr too old")

ds = open_dataset(os.path.join(os.path.dirname(__file__), "data", "example_1.nc"))

m = fsspec.filesystem("memory")
mm = m.get_mapper("out1.zarr")
ds.to_zarr(mm) # old interface
ds0 = ds.copy()
ds0["time"] = ds.time + pd.to_timedelta("1 day")
mm = m.get_mapper("out2.zarr")
ds0.to_zarr(mm) # old interface

url = "memory://out2.zarr"
ds2 = open_dataset(url, engine="zarr")
assert ds0 == ds2

url = "memory://out*.zarr"
ds2 = open_mfdataset(url, engine="zarr")
assert xr.concat([ds, ds0], dim="time") == ds2


@requires_h5netcdf
def test_load_single_value_h5netcdf(tmp_path):
"""Test that numeric single-element vector attributes are handled fine.
Expand Down