Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add to_parquet_dataset function #2898

Merged
merged 51 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3441004
added to_parquet_dataset, passes simple test
zbilodea Dec 13, 2023
5f48c02
style: pre-commit fixes
pre-commit-ci[bot] Dec 13, 2023
28c5a1a
Merge branch 'main' into feat-add-to-parquet-dataset
agoose77 Dec 19, 2023
7de6817
added more complicated test
zbilodea Jan 22, 2024
8dd30eb
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 22, 2024
b3640b5
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 25, 2024
2e5c7bc
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 25, 2024
2056f30
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 26, 2024
8172c51
fixed test
zbilodea Jan 26, 2024
bf4b66c
style: pre-commit fixes
pre-commit-ci[bot] Jan 26, 2024
cadc4d2
formatting for tests
zbilodea Jan 26, 2024
e264d83
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 26, 2024
838f156
removed unnecessary check from test
zbilodea Jan 26, 2024
56df512
added to docstrings
zbilodea Jan 26, 2024
ec3a1d7
one additional check
zbilodea Jan 26, 2024
8960609
started changing from os to fsspec
zbilodea Jan 29, 2024
5eb3124
slightly further
zbilodea Jan 29, 2024
8397881
more small changes, removed regularize-path functionality...
zbilodea Jan 29, 2024
b525048
Can't get filenames yet
zbilodea Jan 30, 2024
6077539
writes local correctly still but need to test non-local
zbilodea Jan 30, 2024
9e08ca6
style: pre-commit fixes
pre-commit-ci[bot] Jan 30, 2024
90966dd
Formatting...
zbilodea Jan 30, 2024
3b5d83f
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 30, 2024
59634c7
removed os completely
zbilodea Jan 30, 2024
d64e2a0
importer skip
zbilodea Jan 30, 2024
9fa8ae2
fixed filenames parameter functionality
zbilodea Jan 31, 2024
16b742f
concatination error
zbilodea Jan 31, 2024
cf7bd7f
still concatination error
zbilodea Jan 31, 2024
9836510
style: pre-commit fixes
pre-commit-ci[bot] Jan 31, 2024
f34e836
Same error
zbilodea Jan 31, 2024
653dd4c
left something in test
zbilodea Jan 31, 2024
1de8916
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Jan 31, 2024
926e62d
still concatination error, trying os.path.join
zbilodea Jan 31, 2024
7179aaf
Merge branch 'feat-add-to-parquet-dataset' of https://github.com/scik…
zbilodea Jan 31, 2024
7a9e5cd
Trying os.path.join
zbilodea Jan 31, 2024
2fc6520
changed write_metadata arguments...
zbilodea Jan 31, 2024
0ac1346
Trying to fix Posixpath error
zbilodea Feb 1, 2024
3813fbe
added an s3 test which passed successfully
zbilodea Feb 5, 2024
61374fe
Added fs.glob and added test to see that wildcarding works.
zbilodea Feb 5, 2024
dc8ba77
made test appropriate for actions
zbilodea Feb 5, 2024
4c8ac95
Changed tests for wildcards
zbilodea Feb 5, 2024
f023205
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 5, 2024
dff39ad
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 7, 2024
5215564
changed tests again to try and solve pytest adjacent error...
zbilodea Feb 7, 2024
9135501
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 7, 2024
98dfc98
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Feb 12, 2024
1ed0b7c
Changed arguments for pyarrow 7.0
zbilodea Feb 12, 2024
89ee769
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 6, 2024
632338d
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 6, 2024
f945366
Merge branch 'main' into feat-add-to-parquet-dataset
jpivarski Mar 20, 2024
ba0101d
Merge branch 'main' into feat-add-to-parquet-dataset
zbilodea Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/toctree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
generated/ak.to_numpy
generated/ak.to_packed
generated/ak.to_parquet
generated/ak.to_parquet_dataset
generated/ak.to_rdataframe

.. toctree::
Expand Down
1 change: 1 addition & 0 deletions src/awkward/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
from awkward.operations.ak_to_numpy import *
from awkward.operations.ak_to_packed import *
from awkward.operations.ak_to_parquet import *
from awkward.operations.ak_to_parquet_dataset import *
from awkward.operations.ak_to_rdataframe import *
from awkward.operations.ak_to_regular import *
from awkward.operations.ak_transform import *
Expand Down
115 changes: 115 additions & 0 deletions src/awkward/operations/ak_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

__all__ = ("to_parquet_dataset",)


def to_parquet_dataset(directory, filenames=None, filename_extension=".parquet"):
jpivarski marked this conversation as resolved.
Show resolved Hide resolved
"""
Args:
directory (str or Path): A local directory in which to write `_common_metadata`
and `_metadata`, making the directory of Parquet files into a dataset.
filenames (None or list of str or Path): If None, the `directory` will be
recursively searched for files ending in `filename_extension` and
sorted lexicographically. Otherwise, this explicit list of files is
taken and row-groups are concatenated in its given order. If any
filenames are relative, they are interpreted relative to `directory`.
filename_extension (str): Filename extension (including `.`) to use to
search for files recursively. Ignored if `filenames` is None.

Creates a `_common_metadata` and a `_metadata` in a directory of Parquet files.

>>> ak.to_parquet(array1, "/directory/arr1.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet(array2, "/directory/arr2.parquet", parquet_compliant_nested=True)
>>> ak.to_parquet_dataset("/directory")

The `_common_metadata` contains the schema that all files share. (If the files
have different schemas, this function raises an exception.)

The `_metadata` contains row-group metadata used to seek to specific row-groups
within the multi-file dataset.
"""

# Implementation

from os import fsdecode, path
jpivarski marked this conversation as resolved.
Show resolved Hide resolved

import awkward._connect.pyarrow

pyarrow_parquet = awkward._connect.pyarrow.import_pyarrow_parquet(
"ak.to_parquet_dataset"
)

try:
directory = fsdecode(directory)
except TypeError:
raise TypeError(
f"'directory' argument of 'ak.to_parquet_dataset' must be a path-like, not {type(directory).__name__}"
) from None

directory = _regularize_path(directory)
if not path.isdir(directory):
raise ValueError(
f"{directory!r} is not a local filesystem directory" + {__file__}
)

if filenames is None:
import glob

filenames = sorted(
glob.glob(directory + f"/**/*{filename_extension}", recursive=True)
)
else:
filenames = [_regularize_path(x) for x in filenames]
filenames = [x if path.isabs(x) else path.join(directory, x) for x in filenames]

relpaths = [path.relpath(x, directory) for x in filenames]
schema, metadata_collector = _common_parquet_schema(
pyarrow_parquet, filenames, relpaths
)
pyarrow_parquet.write_metadata(schema, path.join(directory, "_common_metadata"))
pyarrow_parquet.write_metadata(
schema,
path.join(directory, "_metadata"),
metadata_collector=metadata_collector,
)


def _regularize_path(path):
jpivarski marked this conversation as resolved.
Show resolved Hide resolved
import os

if isinstance(path, getattr(os, "PathLike", ())):
path = os.fspath(path)

elif hasattr(path, "__fspath__"):
path = os.fspath(path)

elif path.__class__.__module__ == "pathlib":
import pathlib

if isinstance(path, pathlib.Path):
path = str(path)

if isinstance(path, str):
path = os.path.expanduser(path)

return path


def _common_parquet_schema(pq, filenames, relpaths):
assert len(filenames) != 0

schema = None
metadata_collector = []
for filename, relpath in zip(filenames, relpaths):
if schema is None:
schema = pq.ParquetFile(filename).schema_arrow
first_filename = filename
elif not schema.equals(pq.ParquetFile(filename).schema_arrow):
jpivarski marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"schema in {} differs from the first schema (in {})".format(
repr(filename), repr(first_filename)
)
)
metadata_collector.append(pq.read_metadata(filename))
metadata_collector[-1].set_file_path(relpath)
return schema, metadata_collector
101 changes: 101 additions & 0 deletions tests/test_2898_to_parquet_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from __future__ import annotations

import os

import awkward as ak


def simple_test(tmp_path):
array = ak.Array([[1.1, 2.2, 3.3], [], [4.4, 5.5]])
array1 = ak.Array([[1.1, 2.2, 3.3, 4.4], [4.0], [4.4, 5.5]])
array2 = ak.Array([[1.0, 3.0, 3.3, 4.4], [4.0], [4.4, 10.0], [11.11]])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, ["arr1.parquet", "arr2.parquet", "arr3.parquet"])

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
[1.1, 2.2, 3.3],
[],
[4.4, 5.5],
[1.1, 2.2, 3.3, 4.4],
[4.0],
[4.4, 5.5],
[1.0, 3.0, 3.3, 4.4],
[4.0],
[4.4, 10.0],
[11.11],
]


def complex_test(tmp_path):
array = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array = ak.Array(
[{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}]
)
array1 = ak.Array([{"x": 1.8, "y": [3, 5, 6]}])
array2 = ak.Array([{"x": 4.4, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}])
ak.to_parquet(
array, os.path.join(tmp_path, "arr1.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array1, os.path.join(tmp_path, "arr2.parquet"), parquet_compliant_nested=True
)
ak.to_parquet(
array2, os.path.join(tmp_path, "arr3.parquet"), parquet_compliant_nested=True
)

ak.to_parquet_dataset(tmp_path, ["arr1.parquet", "arr2.parquet", "arr3.parquet"])

assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_common_metadata"))
assert os.path.exists(os.path.join(tmp_path, "_metadata"))

with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]
with_metadata = ak.from_parquet(tmp_path)
assert with_metadata.tolist() == [
{"x": 1.1, "y": [1]},
{"x": 2.2, "y": [1, 2]},
{"x": 3.3, "y": [1, 2, 3]},
{"x": 1.8, "y": [3, 5, 6]},
{"x": 4.4, "y": [1, 2, 3, 4]},
{"x": 5.5, "y": [1, 2, 3, 4, 5]},
]
Loading