diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000..40072e2364 Binary files /dev/null and b/.DS_Store differ diff --git a/docs/reference/toctree.txt b/docs/reference/toctree.txt index ceac4be5d8..9e025f47c3 100644 --- a/docs/reference/toctree.txt +++ b/docs/reference/toctree.txt @@ -42,6 +42,7 @@ generated/ak.to_numpy generated/ak.to_packed generated/ak.to_parquet + generated/ak.to_parquet_dataset generated/ak.to_parquet_row_groups generated/ak.to_rdataframe diff --git a/src/awkward/operations/__init__.py b/src/awkward/operations/__init__.py index c5d086f795..133ea8a53a 100644 --- a/src/awkward/operations/__init__.py +++ b/src/awkward/operations/__init__.py @@ -90,6 +90,7 @@ from awkward.operations.ak_to_numpy import * from awkward.operations.ak_to_packed import * from awkward.operations.ak_to_parquet import * +from awkward.operations.ak_to_parquet_dataset import * from awkward.operations.ak_to_parquet_row_groups import * from awkward.operations.ak_to_rdataframe import * from awkward.operations.ak_to_regular import * diff --git a/src/awkward/operations/ak_to_parquet_dataset.py b/src/awkward/operations/ak_to_parquet_dataset.py new file mode 100644 index 0000000000..b61c580590 --- /dev/null +++ b/src/awkward/operations/ak_to_parquet_dataset.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from os import fsdecode, path + +from awkward._dispatch import high_level_function + +__all__ = ("to_parquet_dataset",) + + +@high_level_function() +def to_parquet_dataset( + directory, + filenames=None, + storage_options=None, +): + """ + Args: + directory (str or Path): A directory in which to write `_common_metadata` + and `_metadata`, making the directory of Parquet files into a dataset. + filenames (None or list of str or Path): If None, the `directory` will be + recursively searched for files ending in `filename_extension` and + sorted lexicographically. Otherwise, this explicit list of files is + taken and row-groups are concatenated in its given order. If any + filenames are relative, they are interpreted relative to `directory`. + filename_extension (str): Filename extension (including `.`) to use to + search for files recursively. Ignored if `filenames` is None. + + Creates a `_common_metadata` and a `_metadata` in a directory of Parquet files. + + >>> ak.to_parquet(array1, "/directory/arr1.parquet", parquet_compliant_nested=True) + >>> ak.to_parquet(array2, "/directory/arr2.parquet", parquet_compliant_nested=True) + >>> ak.to_parquet_dataset("/directory") + + The `_common_metadata` contains the schema that all files share. (If the files + have different schemas, this function raises an exception.) + + The `_metadata` contains row-group metadata used to seek to specific row-groups + within the multi-file dataset. + """ + + return _impl(directory, filenames, storage_options) + + +def _impl(directory, filenames, storage_options): + # Implementation + import awkward._connect.pyarrow + + pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet( + "ak.to_parquet_dataset" + ) + import fsspec.parquet + + try: + directory = fsdecode(directory) + except TypeError: + raise TypeError( + f"'directory' argument of 'ak.to_parquet_dataset' must be a path-like, not {type(directory).__name__} ('array' argument is first; 'destination' second)" + ) from None + fs, destination = fsspec.core.url_to_fs(directory, **(storage_options or {})) + if not fs.isdir(destination): + raise ValueError(f"{destination!r} is not a directory" + {__file__}) + + filepaths = get_filepaths(filenames, fs, destination) + + if len(filepaths) == 0: + raise ValueError(f"no *.parquet or *.parq matches for path {destination!r}") + + schema = None + metadata_collector = [] + for filepath in filepaths: + with fs.open(filepath, mode="rb") as f: + if schema is None: + schema = pyarrow_parquet.ParquetFile(f).schema_arrow + first_filepath = filepath + elif not schema.equals(pyarrow_parquet.ParquetFile(f).schema_arrow): + raise ValueError( + "schema in {} differs from the first schema (in {})".format( + repr(filepath), repr(first_filepath) + ) + ) + metadata_collector.append(pyarrow_parquet.ParquetFile(f).metadata) + metadata_collector[-1].set_file_path(filepath) + + _common_metadata_path = path.join(destination, "_common_metadata") + pyarrow_parquet.write_metadata(schema, _common_metadata_path, filesystem=fs) + + _metadata_path = path.join(destination, "_metadata") + pyarrow_parquet.write_metadata( + schema, _metadata_path, metadata_collector=metadata_collector, filesystem=fs + ) + return _common_metadata_path, _metadata_path + + +def get_filepaths(filenames, fs, destination): + filepaths = [] + if filenames is not None: + if isinstance(filenames, str): + for f in fs.glob(path.join(destination, filenames)): + if f.endswith((".parq", ".parquet")): + filepaths.append(f) + else: + for filename in filenames: + for f in fs.glob(path.join(destination, filename)): + if f.endswith((".parq", ".parquet")): + filepaths.append(f) + else: + for f, fdata in fs.find(destination, detail=True).items(): + if f.endswith((".parq", ".parquet")): + if fdata["type"] == "file": + filepaths.append(f) + return filepaths diff --git a/tests/test_2898_to_parquet_dataset.py b/tests/test_2898_to_parquet_dataset.py new file mode 100644 index 0000000000..c524b89665 --- /dev/null +++ b/tests/test_2898_to_parquet_dataset.py @@ -0,0 +1,140 @@ +from __future__ import annotations + +import os + +import pytest + +import awkward as ak + +pyarrow = pytest.importorskip("pyarrow") + + +def simple_test(tmp_path): + array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]]) + array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]]) + array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]]) + ak.to_parquet( + array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True + ) + + ak.to_parquet_dataset(tmp_path, filenames="arr[1-3].parquet") + assert os.path.exists(os.path.join(tmp_path, "_common_metadata")) + assert os.path.exists(os.path.join(tmp_path, "_metadata")) + + with_metadata = ak.from_parquet(tmp_path) + print(with_metadata.to_list()) + assert with_metadata.tolist() == [ + [1.1, 2.2, 3.3], + [], + [4.4, 5.5], + [1.1, 2.2, 3.3, 4.4], + [4.0], + [4.4, 5.5], + [1.0, 3.0, 3.3, 4.4], + [4.0], + [4.4, 10.0], + [11.11], + ] + + +def complex_test(tmp_path): + array1 = ak.Array( + [{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}] + ) + array2 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}]) + array3 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}]) + ak.to_parquet( + array1, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array2, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array3, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True + ) + + ak.to_parquet_dataset(tmp_path) + + assert os.path.exists(os.path.join(tmp_path, "_common_metadata")) + assert os.path.exists(os.path.join(tmp_path, "_metadata")) + + with_metadata = ak.from_parquet(tmp_path) + assert with_metadata.tolist() == [ + {"x": 1.1, "y": [1]}, + {"x": 2.2, "y": [1, 2]}, + {"x": 3.3, "y": [1, 2, 3]}, + {"x": 1.8, "y": [3, 5, 6]}, + {"x": 4.4, "y": [1, 2, 3, 4]}, + {"x": 5.5, "y": [1, 2, 3, 4, 5]}, + ] + + +def test_filenames(tmp_path): + array = ak.Array( + [{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}] + ) + array1 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}]) + array2 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}]) + ak.to_parquet( + array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True + ) + + ak.to_parquet_dataset( + tmp_path, filenames=["arr1.parquet", "arr2.parquet", "arr3.parquet"] + ) + + assert os.path.exists(os.path.join(tmp_path, "_common_metadata")) + assert os.path.exists(os.path.join(tmp_path, "_metadata")) + + with_metadata = ak.from_parquet(tmp_path) + assert with_metadata.tolist() == [ + {"x": 1.1, "y": [1]}, + {"x": 2.2, "y": [1, 2]}, + {"x": 3.3, "y": [1, 2, 3]}, + {"x": 1.8, "y": [3, 5, 6]}, + {"x": 4.4, "y": [1, 2, 3, 4]}, + {"x": 5.5, "y": [1, 2, 3, 4, 5]}, + ] + + +def test_wildcard(tmp_path): + array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]]) + array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]]) + array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]]) + ak.to_parquet( + array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True + ) + ak.to_parquet( + array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True + ) + + ak.to_parquet_dataset(tmp_path, filenames="arr?.parquet") + + with_metadata = ak.from_parquet(tmp_path) + assert with_metadata.tolist() == [ + [1.1, 2.2, 3.3], + [], + [4.4, 5.5], + [1.1, 2.2, 3.3, 4.4], + [4.0], + [4.4, 5.5], + [1.0, 3.0, 3.3, 4.4], + [4.0], + [4.4, 10.0], + [11.11], + ]