Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_parquet_dataset function #2898

Merged
merged 51 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3441004
added to_parquet_dataset, passes simple test
zbilodea Dec 13, 2023
5f48c02
style: pre-commit fixes
pre-commit-ci[bot] Dec 13, 2023
28c5a1a
Merge branch 'main' into feat-add-to-parquet-dataset
agoose77 Dec 19, 2023
7de6817
added more complicated test
zbilodea Jan 22, 2024
8dd30eb
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 22, 2024
b3640b5
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 25, 2024
2e5c7bc
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 25, 2024
2056f30
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 26, 2024
8172c51
fixed test
zbilodea Jan 26, 2024
bf4b66c
style: pre-commit fixes
pre-commit-ci[bot] Jan 26, 2024
cadc4d2
formatting for tests
zbilodea Jan 26, 2024
e264d83
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 26, 2024
838f156
removed unnecessary check from test
zbilodea Jan 26, 2024
56df512
added to docstrings
zbilodea Jan 26, 2024
ec3a1d7
one additional check
zbilodea Jan 26, 2024
8960609
started changing from os to fsspec
zbilodea Jan 29, 2024
5eb3124
slightly further
zbilodea Jan 29, 2024
8397881
more small changes, removed regularize-path functionality...
zbilodea Jan 29, 2024
b525048
Can't get filenames yet
zbilodea Jan 30, 2024
6077539
writes local correctly still but need to test non-local
zbilodea Jan 30, 2024
9e08ca6
style: pre-commit fixes
pre-commit-ci[bot] Jan 30, 2024
90966dd
Formatting...
zbilodea Jan 30, 2024
3b5d83f
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 30, 2024
59634c7
removed os completely
zbilodea Jan 30, 2024
d64e2a0
importer skip
zbilodea Jan 30, 2024
9fa8ae2
fixed filenames parameter functionality
zbilodea Jan 31, 2024
16b742f
concatination error
zbilodea Jan 31, 2024
cf7bd7f
still concatination error
zbilodea Jan 31, 2024
9836510
style: pre-commit fixes
pre-commit-ci[bot] Jan 31, 2024
f34e836
Same error
zbilodea Jan 31, 2024
653dd4c
left something in test
zbilodea Jan 31, 2024
1de8916
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 31, 2024
926e62d
still concatination error, trying os.path.join
zbilodea Jan 31, 2024
7179aaf
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 31, 2024
7a9e5cd
Trying os.path.join
zbilodea Jan 31, 2024
2fc6520
changed write_metadata arguments...
zbilodea Jan 31, 2024
0ac1346
Trying to fix Posixpath error
zbilodea Feb 1, 2024
3813fbe
added an s3 test which passed successfully
zbilodea Feb 5, 2024
61374fe
Added fs.glob and added test to see that wildcarding works.
zbilodea Feb 5, 2024
dc8ba77
made test appropriate for actions
zbilodea Feb 5, 2024
4c8ac95
Changed tests for wildcards
zbilodea Feb 5, 2024
f023205
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 5, 2024
dff39ad
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 7, 2024
5215564
changed tests again to try and solve pytest adjacent error...
zbilodea Feb 7, 2024
9135501
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 7, 2024
98dfc98
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 12, 2024
1ed0b7c
Changed arguments for pyarrow 7.0
zbilodea Feb 12, 2024
89ee769
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 6, 2024
632338d
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 6, 2024
f945366
Merge branch 'main' into feat-add-to-parquet-dataset
jpivarski Mar 20, 2024
ba0101d
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions docs/reference/toctree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
generated/ak.to_numpy
generated/ak.to_packed
generated/ak.to_parquet
generated/ak.to_parquet_dataset
generated/ak.to_parquet_row_groups
generated/ak.to_rdataframe

Expand Down
1 change: 1 addition & 0 deletions src/awkward/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from awkward.operations.ak_to_numpy import *
from awkward.operations.ak_to_packed import *
from awkward.operations.ak_to_parquet import *
from awkward.operations.ak_to_parquet_dataset import *
from awkward.operations.ak_to_parquet_row_groups import *
from awkward.operations.ak_to_rdataframe import *
from awkward.operations.ak_to_regular import *
Expand Down
111 changes: 111 additions & 0 deletions src/awkward/operations/ak_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

from os import fsdecode, path

from awkward._dispatch import high_level_function

__all__ = ("to_parquet_dataset",)


@high_level_function()
def to_parquet_dataset(
directory,
filenames=None,
storage_options=None,
):
"""
Args:
directory (str or Path): A directory in which to write `_common_metadata`
and `_metadata`, making the directory of Parquet files into a dataset.
filenames (None or list of str or Path): If None, the `directory` will be
recursively searched for files ending in `filename_extension` and
sorted lexicographically. Otherwise, this explicit list of files is
taken and row-groups are concatenated in its given order. If any
filenames are relative, they are interpreted relative to `directory`.
filename_extension (str): Filename extension (including `.`) to use to
search for files recursively. Ignored if `filenames` is None.

Creates a `_common_metadata` and a `_metadata` in a directory of Parquet files.

>>> ak.to_parquet(array1, "/directory/arr1.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet(array2, "/directory/arr2.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet_dataset("/directory")

The `_common_metadata` contains the schema that all files share. (If the files
have different schemas, this function raises an exception.)

The `_metadata` contains row-group metadata used to seek to specific row-groups
within the multi-file dataset.
"""

return _impl(directory, filenames, storage_options)


def _impl(directory, filenames, storage_options):
# Implementation
import awkward._connect.pyarrow

pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet(
"ak.to_parquet_dataset"
)
import fsspec.parquet

try:
directory = fsdecode(directory)
except TypeError:
raise TypeError(
f"'directory' argument of 'ak.to_parquet_dataset' must be a path-like, not {type(directory).__name__} ('array' argument is first; 'destination' second)"
) from None
fs, destination = fsspec.core.url_to_fs(directory, **(storage_options or {}))
if not fs.isdir(destination):
raise ValueError(f"{destination!r} is not a directory" + {__file__})

filepaths = get_filepaths(filenames, fs, destination)

if len(filepaths) == 0:
raise ValueError(f"no *.parquet or *.parq matches for path {destination!r}")

schema = None
metadata_collector = []
for filepath in filepaths:
with fs.open(filepath, mode="rb") as f:
if schema is None:
schema = pyarrow_parquet.ParquetFile(f).schema_arrow
first_filepath = filepath
elif not schema.equals(pyarrow_parquet.ParquetFile(f).schema_arrow):
raise ValueError(
"schema in {} differs from the first schema (in {})".format(
repr(filepath), repr(first_filepath)
)
)
metadata_collector.append(pyarrow_parquet.ParquetFile(f).metadata)
metadata_collector[-1].set_file_path(filepath)

_common_metadata_path = path.join(destination, "_common_metadata")
pyarrow_parquet.write_metadata(schema, _common_metadata_path, filesystem=fs)

_metadata_path = path.join(destination, "_metadata")
pyarrow_parquet.write_metadata(
schema, _metadata_path, metadata_collector=metadata_collector, filesystem=fs
)
return _common_metadata_path, _metadata_path


def get_filepaths(filenames, fs, destination):
filepaths = []
if filenames is not None:
if isinstance(filenames, str):
for f in fs.glob(path.join(destination, filenames)):
if f.endswith((".parq", ".parquet")):
filepaths.append(f)
else:
for filename in filenames:
for f in fs.glob(path.join(destination, filename)):
if f.endswith((".parq", ".parquet")):
filepaths.append(f)
else:
for f, fdata in fs.find(destination, detail=True).items():
if f.endswith((".parq", ".parquet")):
if fdata["type"] == "file":
filepaths.append(f)
return filepaths
140 changes: 140 additions & 0 deletions tests/test_2898_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from __future__ import annotations

import os

import pytest

import awkward as ak

pyarrow = pytest.importorskip("pyarrow")


def simple_test(tmp_path):
array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]])
array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]])
array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, filenames="arr[1-3].parquet")
assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
print(with_metadata.to_list())
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]


def complex_test(tmp_path):
array1 = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array2 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}])
array3 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}])
ak.to_parquet(
array1, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array3, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path)

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]


def test_filenames(tmp_path):
array = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array1 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}])
array2 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(
tmp_path, filenames=["arr1.parquet", "arr2.parquet", "arr3.parquet"]
)

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]


def test_wildcard(tmp_path):
array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]])
array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]])
array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, filenames="arr?.parquet")

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]
Loading