diff --git a/kerchunk/combine.py b/kerchunk/combine.py index bbe48370..bbe0644c 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -412,7 +412,6 @@ def store_coords(self): elif k in z: # Fall back to existing fill value kw["fill_value"] = z[k].fill_value - arr = group.create_dataset( name=k, data=data, diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index 7340fb59..d43b6b97 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -1,4 +1,3 @@ -import base64 from functools import reduce from operator import mul @@ -6,7 +5,7 @@ from fsspec.implementations.reference import LazyReferenceMapper import fsspec -from kerchunk.utils import _encode_for_JSON +from kerchunk.utils import _encode_for_JSON, inline_array try: from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable @@ -202,21 +201,11 @@ def translate(self): ) part = ".".join(["0"] * len(shape)) or "0" k = f"{dim}/{part}" - if self.threshold and int(self.chunks[dim][1]) < self.threshold: - self.fp.seek(int(self.chunks[dim][0])) - data = self.fp.read(int(self.chunks[dim][1])) - try: - # easiest way to test if data is ascii - data.decode("ascii") - except UnicodeDecodeError: - data = b"base64:" + base64.b64encode(data) - out[k] = data - else: - out[k] = [ - self.filename, - int(self.chunks[dim][0]), - int(self.chunks[dim][1]), - ] + out[k] = [ + self.filename, + int(self.chunks[dim][0]), + int(self.chunks[dim][1]), + ] arr.attrs.update( { k: v.decode() if isinstance(v, bytes) else str(v) @@ -232,7 +221,8 @@ def translate(self): # native chunks version (no codec, no options) start, size, dt = self.chunks["record_array"][0] dt = np.dtype(dt) - outer_shape = size // dt.itemsize + itemsize = sum(dt[_].itemsize for _ in dt.names) + outer_shape = size // itemsize offset = start for name in dt.names: dtype = dt[name] @@ -294,6 +284,12 @@ def translate(self): if k != "filename" # special "attribute" } ) + if self.threshold: + out = inline_array( + out, + self.threshold, + remote_options=dict(remote_options=self.storage_options), + ) if isinstance(out, LazyReferenceMapper): out.flush() diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 43752007..f24f2974 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -123,7 +123,7 @@ def test_grib_tree(): "atmosphere latitude longitude step time valid_time".split() ) # Assert that the fill value is set correctly - assert zg.refc.instant.atmosphere.step.fill_value is np.NaN + assert zg.refc.instant.atmosphere.step.fill_value is np.nan # The following two tests use json fixture data generated from calling scan grib diff --git a/kerchunk/tests/test_utils.py b/kerchunk/tests/test_utils.py index eaf6c7af..a1bb094d 100644 --- a/kerchunk/tests/test_utils.py +++ b/kerchunk/tests/test_utils.py @@ -1,6 +1,7 @@ import io import fsspec +import json import kerchunk.utils import kerchunk.zarr import numpy as np @@ -72,16 +73,16 @@ def test_inline_array(): "data/.zattrs": '{"foo": "bar"}', } fs = fsspec.filesystem("reference", fo=refs) - out1 = kerchunk.utils.inline_array(refs, threshold=1000) # does nothing + out1 = kerchunk.utils.inline_array(refs, threshold=1) # does nothing assert out1 == refs - out2 = kerchunk.utils.inline_array(refs, threshold=1000, names=["data"]) # explicit + out2 = kerchunk.utils.inline_array(refs, threshold=1, names=["data"]) # explicit assert "data/1" not in out2 - assert out2["data/.zattrs"] == refs["data/.zattrs"] + assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"]) fs = fsspec.filesystem("reference", fo=out2) g = zarr.open(fs.get_mapper()) assert g.data[:].tolist() == [1, 2] - out3 = kerchunk.utils.inline_array(refs, threshold=1) # inlines because of size + out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size assert "data/1" not in out3 fs = fsspec.filesystem("reference", fo=out3) g = zarr.open(fs.get_mapper()) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 9133b3f2..838c3cb1 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -182,7 +182,7 @@ def _inline_array(group, threshold, names, prefix=""): if isinstance(thing, zarr.Group): _inline_array(thing, threshold=threshold, prefix=prefix1, names=names) else: - cond1 = threshold and thing.nbytes < threshold and thing.nchunks > 1 + cond1 = threshold and thing.nbytes < threshold cond2 = prefix1 in names if cond1 or cond2: original_attrs = dict(thing.attrs) @@ -194,6 +194,7 @@ def _inline_array(group, threshold, names, prefix=""): chunks=thing.shape, compression=None, overwrite=True, + fill_value=thing.fill_value, ) arr.attrs.update(original_attrs) @@ -249,18 +250,38 @@ def subchunk(store, variable, factor): modified store """ fs = fsspec.filesystem("reference", fo=store) - store = copy.deepcopy(store) + store = fs.references meta_file = f"{variable}/.zarray" meta = ujson.loads(fs.cat(meta_file)) if meta["compressor"] is not None: raise ValueError("Can only subchunk an uncompressed array") chunks_orig = meta["chunks"] - if chunks_orig[0] % factor == 0: - chunk_new = [chunks_orig[0] // factor] + chunks_orig[1:] - else: - raise ValueError("Must subchunk by exact integer factor") + chunk_new = [] + # plan + multi = None + for ind, this_chunk in enumerate(chunks_orig): + if this_chunk == 1: + chunk_new.append(1) + continue + elif this_chunk % factor == 0: + chunk_new.extend([this_chunk // factor] + chunks_orig[ind + 1 :]) + break + elif factor % this_chunk == 0: + # if factor // chunks_orig[0] > 1: + chunk_new.append(1) + if multi is None: + multi = this_chunk + factor //= this_chunk + else: + raise ValueError("Must subchunk by exact integer factor") + if multi: + # TODO: this reloads the referenceFS; *maybe* reuses it + return subchunk(store, variable, multi) + + # execute meta["chunks"] = chunk_new + store = copy.deepcopy(store) store[meta_file] = ujson.dumps(meta) for k, v in store.copy().items(): @@ -268,8 +289,11 @@ def subchunk(store, variable, factor): kpart = k[len(variable) + 1 :] if kpart.startswith(".z"): continue - sep = "." if "." in k else "/" + sep = "." if "." in kpart else "/" chunk_index = [int(_) for _ in kpart.split(sep)] + if isinstance(v, (str, bytes)): + # TODO: check this early, as some refs might have been edited already + raise ValueError("Refusing to sub-chunk inlined data") if len(v) > 1: url, offset, size = v else: @@ -277,7 +301,11 @@ def subchunk(store, variable, factor): offset = 0 size = fs.size(k) for subpart in range(factor): - new_index = [chunk_index[0] * factor + subpart] + chunk_index[1:] + new_index = ( + chunk_index[:ind] + + [chunk_index[ind] * factor + subpart] + + chunk_index[ind + 1 :] + ) newpart = sep.join(str(_) for _ in new_index) newv = [url, offset + subpart * size // factor, size // factor] store[f"{variable}/{newpart}"] = newv