Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Only return parquet metadata if intending to write #549

Merged
merged 8 commits into from
Nov 20, 2024
Merged
2 changes: 1 addition & 1 deletion .github/workflows/awkward-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: |
python3 -m pip install pip wheel -U
python3 -m pip install -q --no-cache-dir -e .[complete,test]
python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main
python3 -m pip uninstall -y awkward && pip install git+https://github.com/scikit-hep/awkward.git@main --no-deps
- name: Run tests
run: |
python3 -m pytest
47 changes: 38 additions & 9 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def __init__(
npartitions: int,
prefix: str | None = None,
storage_options: dict | None = None,
write_metadata: bool = False,
**kwargs: Any,
):
self.fs = fs
Expand All @@ -496,16 +497,19 @@ def __init__(
if isinstance(self.fs.protocol, str)
else self.fs.protocol[0]
)
self.write_metadata = write_metadata
self.kwargs = kwargs

def __call__(self, data, block_index):
filename = f"part{str(block_index[0]).zfill(self.zfill)}.parquet"
if self.prefix is not None:
filename = f"{self.prefix}-{filename}"
filename = self.fs.unstrip_protocol(f"{self.path}{self.fs.sep}{filename}")
return ak.to_parquet(
out = ak.to_parquet(
data, filename, **self.kwargs, storage_options=self.storage_options
)
if self.write_metadata:
return out


def to_parquet(
Expand Down Expand Up @@ -597,7 +601,10 @@ def to_parquet(
storage_options
Storage options passed to ``fsspec``.
write_metadata
Write Parquet metadata.
Write Parquet metadata. Note, that when this is True, all the
metadata pieces will be pulled into a single finalizer task. When
False, the whole write graph can be evaluated as a more efficient
tree reduction.
compute
If ``True``, immediately compute the result (write data to
disk). If ``False`` a Scalar collection will be returned such
Expand Down Expand Up @@ -667,6 +674,7 @@ def to_parquet(
parquet_old_int96_timestamps=parquet_old_int96_timestamps,
parquet_compliant_nested=parquet_compliant_nested,
parquet_extra_options=parquet_extra_options,
write_metadata=write_metadata,
),
array,
BlockIndex((array.npartitions,)),
Expand All @@ -681,17 +689,38 @@ def to_parquet(
dsk[(final_name, 0)] = (_metadata_file_from_metas, fs, path) + tuple(
map_res.__dask_keys__()
)
graph = HighLevelGraph.from_collections(
final_name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]),
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")
else:
final_name = name + "-finalize"
dsk[(final_name, 0)] = (lambda *_: None, map_res.__dask_keys__())
graph = HighLevelGraph.from_collections(
final_name,
AwkwardMaterializedLayer(dsk, previous_layer_names=[map_res.name]),
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")
from dask_awkward.layers import AwkwardTreeReductionLayer

layer = AwkwardTreeReductionLayer(
name=final_name,
concat_func=none_to_none,
tree_node_func=none_to_none,
name_input=map_res.name,
npartitions_input=map_res.npartitions,
finalize_func=none_to_none,
)
graph = HighLevelGraph.from_collections(
final_name,
layer,
dependencies=[map_res],
)
out = new_scalar_object(graph, final_name, dtype="f8")

if compute:
out.compute()
return None
else:
return out


def none_to_none(*_):
"""Dummy reduction function where write tasks produce no metadata"""
return None
Loading