Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask awkward support for uproot.dask #652

Merged
merged 26 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b34350a
Changed everything we _think_ we need to change.
jpivarski Jul 5, 2022
5957e0e
All tests pass.
jpivarski Jul 5, 2022
506e2fe
Change test 0034
kkothari2001 Jul 7, 2022
83a3953
Alter instances of from_datashape
kkothari2001 Jul 7, 2022
2547a04
Removed 'v1/v2 insensitivity' checks. We're all-in for v2 now.
jpivarski Jul 7, 2022
d9890b2
Added basic dask-awkward support
kkothari2001 Jul 17, 2022
4a8f59e
Delay file open feature
kkothari2001 Jul 18, 2022
a476d47
Merge remote-tracking branch 'origin/main' into dask-awkward-support
kkothari2001 Jul 18, 2022
d46481f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 18, 2022
c5ed2c0
Complete dask-awkward using from_map
kkothari2001 Aug 1, 2022
42afafa
Added tests for dask_awkward
kkothari2001 Aug 1, 2022
c5c5c15
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 1, 2022
2df67c5
Added dask-awkward to testing dependancies
kkothari2001 Aug 1, 2022
18c6647
Add dask-awk to dependancies in github workflow
kkothari2001 Aug 1, 2022
2b904f8
Skip dask_awkward tests for all except python version == 3.8
kkothari2001 Aug 1, 2022
d38ac6a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 1, 2022
86501e2
Make suggested changes
kkothari2001 Aug 10, 2022
d012d70
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 10, 2022
d2fe1a6
Add back the noqa's which are being removed for some reason
kkothari2001 Aug 10, 2022
50b16f2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 10, 2022
4ab1564
Refactored uproot.dask as suggested
kkothari2001 Aug 11, 2022
da3c949
Complete tests for dask_awkward
kkothari2001 Aug 11, 2022
1e54bb1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2022
c040121
Make awkward the default library for uproot.dask
kkothari2001 Aug 11, 2022
5c0e8ef
Merge branch 'dask-awkward-support' of github.com:scikit-hep/uproot5 …
kkothari2001 Aug 11, 2022
ac585a1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
run: |
conda env list
conda install root
pip install dask-awkward
conda list

- name: "Install XRootD"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def get_version():
"xxhash",
"requests",
"dask[array]",
"dask-awkward",
],
}
extras["all"] = sum(extras.values(), [])
Expand Down
262 changes: 236 additions & 26 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ def dask(
"""
files = uproot._util.regularize_files(files)
library = uproot.interpretation.library._regularize_library(library)
if library.name != "np":
raise NotImplementedError()
# if library.name != "np":
# raise NotImplementedError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a committed PR, old code should be removed, not commented out.

But I think you still want to complain about Pandas.

Suggested change
# if library.name != "np":
# raise NotImplementedError()
if library.name == "pd":
raise NotImplementedError()


real_options = dict(options)
if "num_workers" not in real_options:
Expand All @@ -609,31 +609,60 @@ def dask(

filter_branch = uproot._util.regularize_filter(filter_branch)

if open_files:
return _get_dask_array(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
step_size,
custom_classes,
allow_missing,
real_options,
)
if library.name == "np":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the code in TBranch.py to avoid special cases that depend on the value of library; that's what library.* methods are for. Can this be refactored to call library.something here, and then have the different library implementations do the different things?

if open_files:
return _get_dask_array(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
step_size,
custom_classes,
allow_missing,
real_options,
)
else:
return _get_dask_array_delay_open(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
custom_classes,
allow_missing,
real_options,
)
elif library.name == "ak":
if open_files:
return _get_dak_array(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
step_size,
custom_classes,
allow_missing,
real_options,
)
else:
return _get_dak_array_delay_open(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
custom_classes,
allow_missing,
real_options,
)
else:
return _get_dask_array_delay_open(
files,
filter_name,
filter_typename,
filter_branch,
recursive,
full_paths,
custom_classes,
allow_missing,
real_options,
)
raise NotImplementedError()


class Report:
Expand Down Expand Up @@ -3549,6 +3578,187 @@ def delayed_get_array(ttree, key):
return dask_dict


def _get_dak_array(
files,
filter_name=no_filter,
filter_typename=no_filter,
filter_branch=no_filter,
recursive=True,
full_paths=False,
step_size="100 MB",
custom_classes=None,
allow_missing=False,
real_options=None,
):
import dask_awkward

hasbranches = []
common_keys = None
is_self = []

count = 0
for file_path, object_path in files:
obj = uproot._util.regularize_object_path(
file_path, object_path, custom_classes, allow_missing, real_options
)

if obj is not None:
count += 1

if isinstance(obj, TBranch) and len(obj.keys(recursive=True)) == 0:
original = obj
obj = obj.parent
is_self.append(True)

def real_filter_branch(branch):
return branch is original and filter_branch(branch)

else:
is_self.append(False)
real_filter_branch = filter_branch

hasbranches.append(obj)

new_keys = obj.keys(
recursive=recursive,
filter_name=filter_name,
filter_typename=filter_typename,
filter_branch=real_filter_branch,
full_paths=full_paths,
)

if common_keys is None:
common_keys = new_keys
else:
new_keys = set(new_keys)
common_keys = [key for key in common_keys if key in new_keys]

if count == 0:
raise ValueError(
"allow_missing=True and no TTrees found in\n\n {}".format(
"\n ".join(
"{"
+ "{}: {}".format(
repr(f.file_path if isinstance(f, HasBranches) else f),
repr(f.object_path if isinstance(f, HasBranches) else o),
)
+ "}"
for f, o in files
)
)
)

if len(common_keys) == 0 or not (all(is_self) or not any(is_self)):
raise ValueError(
"TTrees in\n\n {}\n\nhave no TBranches in common".format(
"\n ".join(
"{"
+ "{}: {}".format(
repr(f.file_path if isinstance(f, HasBranches) else f),
repr(f.object_path if isinstance(f, HasBranches) else o),
)
+ "}"
for f, o in files
)
)
)

class _UprootRead:
def __init__(self, hasbranches, branches) -> None:
self.hasbranches = hasbranches
self.branches = branches

def __call__(self, i_start_stop):
i, start, stop = i_start_stop
return self.hasbranches[i].arrays(
self.branches, entry_start=start, entry_stop=stop
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can be defined at top-level, outside of this function. That might be necessary to serialize it and send it to a remote machine. (In general, nested classes and functions can depend on local variables, which can prevent them from being pickleable, which Dask needs to send data to remote processors.)

Even if that's not necessary for this helper function, having a common practice of making functions and classes top-level by default would make it so that we don't have to ask ourselves this questions whenever we see a nested function/class.

If you want to introduce a new src/uproot/_dask.py to have a place to put all of these helpers, that would be good. TBranch.py is getting very full (and it's always easier to merge files than split them).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can be defined at top-level, outside of this function.

Yes! it's definitely best to have dask callables as standalone classes or functions.


partition_args = []
for i, ttree in enumerate(hasbranches):
entry_start, entry_stop = _regularize_entries_start_stop(
ttree.num_entries, None, None
)
entry_step = 0
if uproot._util.isint(step_size):
entry_step = step_size
else:
entry_step = ttree.num_entries_for(step_size, expressions=common_keys)

def foreach(start):
stop = min(start + entry_step, entry_stop)
partition_args.append((i, start, stop))

for start in range(entry_start, entry_stop, entry_step):
foreach(start)

first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5)
meta = dask_awkward.core.typetracer_array(first5)

return dask_awkward.from_map(
_UprootRead(hasbranches, common_keys),
partition_args,
label="from-uproot",
meta=meta,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask part cc @douglasdavis

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes: First I tried using the same code to create the typetracer array with ak.Array(first5.layout.typetracer.forget_length()) but it was throwing an error that made me realise it probably wasn't the correct way to get the tytracer array.

Going through the code for dask-awkward I found this function dask_awkward.core.typetracer_array, it works, but please confirm if I'm using it right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail?

Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the entry_start and entry_stop numbers for a given TBasket (such as number 0).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number 5 was from my first PoC; I think it would make sense to have this as something that is configurable with a sensible default



def _get_dak_array_delay_open(
files,
filter_name=no_filter,
filter_typename=no_filter,
filter_branch=no_filter,
recursive=True,
full_paths=False,
custom_classes=None,
allow_missing=False,
real_options=None, # NOTE: a comma after **options breaks Python 2
):
import dask_awkward

ffile_path, fobject_path = files[0]
obj = uproot._util.regularize_object_path(
ffile_path, fobject_path, custom_classes, allow_missing, real_options
)
common_keys = obj.keys(
recursive=recursive,
filter_name=filter_name,
filter_typename=filter_typename,
filter_branch=filter_branch,
full_paths=full_paths,
)

class _UprootOpenAndRead:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nested class that might not need to be nested.

def __init__(
self, custom_classes, allow_missing, real_options, common_keys
) -> None:
self.custom_classes = custom_classes
self.allow_missing = allow_missing
self.real_options = real_options
self.common_keys = common_keys

def __call__(self, file_path_object_path):
file_path, object_path = file_path_object_path
ttree = uproot._util.regularize_object_path(
file_path,
object_path,
self.custom_classes,
self.allow_missing,
self.real_options,
)
return ttree.arrays(self.common_keys)

first5 = obj.arrays(common_keys, entry_start=0, entry_stop=5)
meta = dask_awkward.core.typetracer_array(first5)

return dask_awkward.from_map(
_UprootOpenAndRead(custom_classes, allow_missing, real_options, common_keys),
files,
label="from-uproot",
meta=meta,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask part cc @douglasdavis



class _WrapDict(MutableMapping):
def __init__(self, dict):
self.dict = dict
Expand Down
96 changes: 96 additions & 0 deletions tests/test_0652_dask-for-awkward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE

import numpy
import pytest
import skhep_testdata

import uproot

dask = pytest.importorskip("dask")
dask_awkward = pytest.importorskip("dask_awkward")


def test_single_dask_awkward_array():
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
ttree = uproot.open(test_path)

ak_array = ttree.arrays()
dak_array = uproot.dask(test_path, library="ak")

assert len(ak_array) == len(dak_array)

assert dak_array.compute().to_list() == ak_array.to_list()
Copy link
Contributor

@douglasdavis douglasdavis Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When comparing a dask-awkward array to a concrete awkward array try to use dask-awkward's assert_eq; it does more checks than just for equality. For example see

https://github.com/ContinuumIO/dask-awkward/blob/7b5ca3c7077b1ab3524c581cf28950ade5b6907f/tests/test_io.py#L238-L244



def test_dask_concatenation():
test_path1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
test_path2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
test_path3 = skhep_testdata.data_path("uproot-Zmumu-zlib.root") + ":events"
test_path4 = skhep_testdata.data_path("uproot-Zmumu-lzma.root") + ":events"

ak_array = uproot.concatenate([test_path1, test_path2, test_path3, test_path4])
dak_array = uproot.dask(
[test_path1, test_path2, test_path3, test_path4], library="ak"
)

assert len(ak_array) == len(dak_array)

assert dak_array.compute().to_list() == ak_array.to_list()


def test_multidim_array():
test_path = (
skhep_testdata.data_path("uproot-sample-6.08.04-uncompressed.root") + ":sample"
)
ttree = uproot.open(test_path)

ak_array = ttree.arrays()
dak_array = uproot.dask(test_path, library="ak")

assert len(ak_array) == len(dak_array)

assert dak_array.compute().to_list() == ak_array.to_list()


def test_chunking_single_num():
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
assert uproot.dask(test_path, step_size=10, library="ak").npartitions == 231


def test_chunking_single_string():
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
assert uproot.dask(test_path, step_size="500B", library="ak").npartitions == 330


def test_chunking_multiple_num():
filename1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
filename2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
assert (
uproot.dask([filename1, filename2], step_size=10, library="ak").npartitions
== 462
)


def test_chunking_multiple_string():
filename1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
filename2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
assert (
uproot.dask([filename1, filename2], step_size="500B", library="ak").npartitions
== 1098
)


def test_delay_open():
test_path1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
test_path2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
test_path3 = skhep_testdata.data_path("uproot-Zmumu-zlib.root") + ":events"
test_path4 = skhep_testdata.data_path("uproot-Zmumu-lzma.root") + ":events"

ak_array = uproot.concatenate([test_path1, test_path2, test_path3, test_path4])
dak_array = uproot.dask(
[test_path1, test_path2, test_path3, test_path4], open_files=False, library="ak"
)

assert len(ak_array) == len(dak_array)

assert dak_array.compute().to_list() == ak_array.to_list()