Skip to content

Commit

Permalink
feat: add to_parquet_dataset function (#2898)
Browse files Browse the repository at this point in the history
* added to_parquet_dataset, passes simple test

* style: pre-commit fixes

* added more complicated test

* added to docstrings

* one additional check

* changing from os to fsspec

* Can't get filenames yet

* fixed filenames parameter functionality

* added an s3 test which passed successfully

* Added fs.glob and added test to see that wildcarding works.

* Changed tests for wildcards

* changed tests again to try and solve pytest adjacent error...

* Changed arguments for pyarrow 7.0

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Angus Hollands <[email protected]>
Co-authored-by: Jim Pivarski <[email protected]>
  • Loading branch information
4 people authored Mar 20, 2024
1 parent 51f452e commit 3270642
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 0 deletions.
Binary file added .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions docs/reference/toctree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
generated/ak.to_numpy
generated/ak.to_packed
generated/ak.to_parquet
generated/ak.to_parquet_dataset
generated/ak.to_parquet_row_groups
generated/ak.to_rdataframe

Expand Down
1 change: 1 addition & 0 deletions src/awkward/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from awkward.operations.ak_to_numpy import *
from awkward.operations.ak_to_packed import *
from awkward.operations.ak_to_parquet import *
from awkward.operations.ak_to_parquet_dataset import *
from awkward.operations.ak_to_parquet_row_groups import *
from awkward.operations.ak_to_rdataframe import *
from awkward.operations.ak_to_regular import *
Expand Down
111 changes: 111 additions & 0 deletions src/awkward/operations/ak_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

from os import fsdecode, path

from awkward._dispatch import high_level_function

__all__ = ("to_parquet_dataset",)


@high_level_function()
def to_parquet_dataset(
directory,
filenames=None,
storage_options=None,
):
"""
Args:
directory (str or Path): A directory in which to write `_common_metadata`
and `_metadata`, making the directory of Parquet files into a dataset.
filenames (None or list of str or Path): If None, the `directory` will be
recursively searched for files ending in `filename_extension` and
sorted lexicographically. Otherwise, this explicit list of files is
taken and row-groups are concatenated in its given order. If any
filenames are relative, they are interpreted relative to `directory`.
filename_extension (str): Filename extension (including `.`) to use to
search for files recursively. Ignored if `filenames` is None.
Creates a `_common_metadata` and a `_metadata` in a directory of Parquet files.
>>> ak.to_parquet(array1, "/directory/arr1.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet(array2, "/directory/arr2.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet_dataset("/directory")
The `_common_metadata` contains the schema that all files share. (If the files
have different schemas, this function raises an exception.)
The `_metadata` contains row-group metadata used to seek to specific row-groups
within the multi-file dataset.
"""

return _impl(directory, filenames, storage_options)


def _impl(directory, filenames, storage_options):
# Implementation
import awkward._connect.pyarrow

pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet(
"ak.to_parquet_dataset"
)
import fsspec.parquet

try:
directory = fsdecode(directory)
except TypeError:
raise TypeError(
f"'directory' argument of 'ak.to_parquet_dataset' must be a path-like, not {type(directory).__name__} ('array' argument is first; 'destination' second)"
) from None
fs, destination = fsspec.core.url_to_fs(directory, **(storage_options or {}))
if not fs.isdir(destination):
raise ValueError(f"{destination!r} is not a directory" + {__file__})

filepaths = get_filepaths(filenames, fs, destination)

if len(filepaths) == 0:
raise ValueError(f"no *.parquet or *.parq matches for path {destination!r}")

schema = None
metadata_collector = []
for filepath in filepaths:
with fs.open(filepath, mode="rb") as f:
if schema is None:
schema = pyarrow_parquet.ParquetFile(f).schema_arrow
first_filepath = filepath
elif not schema.equals(pyarrow_parquet.ParquetFile(f).schema_arrow):
raise ValueError(
"schema in {} differs from the first schema (in {})".format(
repr(filepath), repr(first_filepath)
)
)
metadata_collector.append(pyarrow_parquet.ParquetFile(f).metadata)
metadata_collector[-1].set_file_path(filepath)

_common_metadata_path = path.join(destination, "_common_metadata")
pyarrow_parquet.write_metadata(schema, _common_metadata_path, filesystem=fs)

_metadata_path = path.join(destination, "_metadata")
pyarrow_parquet.write_metadata(
schema, _metadata_path, metadata_collector=metadata_collector, filesystem=fs
)
return _common_metadata_path, _metadata_path


def get_filepaths(filenames, fs, destination):
filepaths = []
if filenames is not None:
if isinstance(filenames, str):
for f in fs.glob(path.join(destination, filenames)):
if f.endswith((".parq", ".parquet")):
filepaths.append(f)
else:
for filename in filenames:
for f in fs.glob(path.join(destination, filename)):
if f.endswith((".parq", ".parquet")):
filepaths.append(f)
else:
for f, fdata in fs.find(destination, detail=True).items():
if f.endswith((".parq", ".parquet")):
if fdata["type"] == "file":
filepaths.append(f)
return filepaths
140 changes: 140 additions & 0 deletions tests/test_2898_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from __future__ import annotations

import os

import pytest

import awkward as ak

pyarrow = pytest.importorskip("pyarrow")


def simple_test(tmp_path):
array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]])
array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]])
array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, filenames="arr[1-3].parquet")
assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
print(with_metadata.to_list())
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]


def complex_test(tmp_path):
array1 = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array2 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}])
array3 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}])
ak.to_parquet(
array1, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array3, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path)

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]


def test_filenames(tmp_path):
array = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array1 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}])
array2 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(
tmp_path, filenames=["arr1.parquet", "arr2.parquet", "arr3.parquet"]
)

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]


def test_wildcard(tmp_path):
array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]])
array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]])
array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, filenames="arr?.parquet")

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]

0 comments on commit 3270642

Please sign in to comment.