-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask awkward support for uproot.dask #652
Changes from 16 commits
b34350a
5957e0e
506e2fe
83a3953
2547a04
d9890b2
4a8f59e
a476d47
d46481f
c5ed2c0
42afafa
c5c5c15
2df67c5
18c6647
2b904f8
d38ac6a
86501e2
d012d70
d2fe1a6
50b16f2
4ab1564
da3c949
1e54bb1
c040121
5c0e8ef
ac585a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -598,8 +598,8 @@ def dask( | |
""" | ||
files = uproot._util.regularize_files(files) | ||
library = uproot.interpretation.library._regularize_library(library) | ||
if library.name != "np": | ||
raise NotImplementedError() | ||
# if library.name != "np": | ||
# raise NotImplementedError() | ||
|
||
real_options = dict(options) | ||
if "num_workers" not in real_options: | ||
|
@@ -609,31 +609,60 @@ def dask( | |
|
||
filter_branch = uproot._util.regularize_filter(filter_branch) | ||
|
||
if open_files: | ||
return _get_dask_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
if library.name == "np": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want the code in TBranch.py to avoid special cases that depend on the value of |
||
if open_files: | ||
return _get_dask_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dask_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
elif library.name == "ak": | ||
if open_files: | ||
return _get_dak_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dak_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dask_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
raise NotImplementedError() | ||
|
||
|
||
class Report: | ||
|
@@ -3549,6 +3578,187 @@ def delayed_get_array(ttree, key): | |
return dask_dict | ||
|
||
|
||
def _get_dak_array( | ||
files, | ||
filter_name=no_filter, | ||
filter_typename=no_filter, | ||
filter_branch=no_filter, | ||
recursive=True, | ||
full_paths=False, | ||
step_size="100 MB", | ||
custom_classes=None, | ||
allow_missing=False, | ||
real_options=None, | ||
): | ||
import dask_awkward | ||
|
||
hasbranches = [] | ||
common_keys = None | ||
is_self = [] | ||
|
||
count = 0 | ||
for file_path, object_path in files: | ||
obj = uproot._util.regularize_object_path( | ||
file_path, object_path, custom_classes, allow_missing, real_options | ||
) | ||
|
||
if obj is not None: | ||
count += 1 | ||
|
||
if isinstance(obj, TBranch) and len(obj.keys(recursive=True)) == 0: | ||
original = obj | ||
obj = obj.parent | ||
is_self.append(True) | ||
|
||
def real_filter_branch(branch): | ||
return branch is original and filter_branch(branch) | ||
|
||
else: | ||
is_self.append(False) | ||
real_filter_branch = filter_branch | ||
|
||
hasbranches.append(obj) | ||
|
||
new_keys = obj.keys( | ||
recursive=recursive, | ||
filter_name=filter_name, | ||
filter_typename=filter_typename, | ||
filter_branch=real_filter_branch, | ||
full_paths=full_paths, | ||
) | ||
|
||
if common_keys is None: | ||
common_keys = new_keys | ||
else: | ||
new_keys = set(new_keys) | ||
common_keys = [key for key in common_keys if key in new_keys] | ||
|
||
if count == 0: | ||
raise ValueError( | ||
"allow_missing=True and no TTrees found in\n\n {}".format( | ||
"\n ".join( | ||
"{" | ||
+ "{}: {}".format( | ||
repr(f.file_path if isinstance(f, HasBranches) else f), | ||
repr(f.object_path if isinstance(f, HasBranches) else o), | ||
) | ||
+ "}" | ||
for f, o in files | ||
) | ||
) | ||
) | ||
|
||
if len(common_keys) == 0 or not (all(is_self) or not any(is_self)): | ||
raise ValueError( | ||
"TTrees in\n\n {}\n\nhave no TBranches in common".format( | ||
"\n ".join( | ||
"{" | ||
+ "{}: {}".format( | ||
repr(f.file_path if isinstance(f, HasBranches) else f), | ||
repr(f.object_path if isinstance(f, HasBranches) else o), | ||
) | ||
+ "}" | ||
for f, o in files | ||
) | ||
) | ||
) | ||
|
||
class _UprootRead: | ||
def __init__(self, hasbranches, branches) -> None: | ||
self.hasbranches = hasbranches | ||
self.branches = branches | ||
|
||
def __call__(self, i_start_stop): | ||
i, start, stop = i_start_stop | ||
return self.hasbranches[i].arrays( | ||
self.branches, entry_start=start, entry_stop=stop | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this class can be defined at top-level, outside of this function. That might be necessary to serialize it and send it to a remote machine. (In general, nested classes and functions can depend on local variables, which can prevent them from being pickleable, which Dask needs to send data to remote processors.) Even if that's not necessary for this helper function, having a common practice of making functions and classes top-level by default would make it so that we don't have to ask ourselves this questions whenever we see a nested function/class. If you want to introduce a new src/uproot/_dask.py to have a place to put all of these helpers, that would be good. TBranch.py is getting very full (and it's always easier to merge files than split them). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes! it's definitely best to have dask callables as standalone classes or functions. |
||
|
||
partition_args = [] | ||
for i, ttree in enumerate(hasbranches): | ||
entry_start, entry_stop = _regularize_entries_start_stop( | ||
ttree.num_entries, None, None | ||
) | ||
entry_step = 0 | ||
if uproot._util.isint(step_size): | ||
entry_step = step_size | ||
else: | ||
entry_step = ttree.num_entries_for(step_size, expressions=common_keys) | ||
|
||
def foreach(start): | ||
stop = min(start + entry_step, entry_stop) | ||
partition_args.append((i, start, stop)) | ||
|
||
for start in range(entry_start, entry_stop, entry_step): | ||
foreach(start) | ||
|
||
first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5) | ||
meta = dask_awkward.core.typetracer_array(first5) | ||
|
||
return dask_awkward.from_map( | ||
_UprootRead(hasbranches, common_keys), | ||
partition_args, | ||
label="from-uproot", | ||
meta=meta, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relevant dask part cc @douglasdavis There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Notes: First I tried using the same code to create the typetracer array with Going through the code for dask-awkward I found this function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail? Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number 5 was from my first PoC; I think it would make sense to have this as something that is configurable with a sensible default |
||
|
||
|
||
def _get_dak_array_delay_open( | ||
files, | ||
filter_name=no_filter, | ||
filter_typename=no_filter, | ||
filter_branch=no_filter, | ||
recursive=True, | ||
full_paths=False, | ||
custom_classes=None, | ||
allow_missing=False, | ||
real_options=None, # NOTE: a comma after **options breaks Python 2 | ||
): | ||
import dask_awkward | ||
|
||
ffile_path, fobject_path = files[0] | ||
obj = uproot._util.regularize_object_path( | ||
ffile_path, fobject_path, custom_classes, allow_missing, real_options | ||
) | ||
common_keys = obj.keys( | ||
recursive=recursive, | ||
filter_name=filter_name, | ||
filter_typename=filter_typename, | ||
filter_branch=filter_branch, | ||
full_paths=full_paths, | ||
) | ||
|
||
class _UprootOpenAndRead: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another nested class that might not need to be nested. |
||
def __init__( | ||
self, custom_classes, allow_missing, real_options, common_keys | ||
) -> None: | ||
self.custom_classes = custom_classes | ||
self.allow_missing = allow_missing | ||
self.real_options = real_options | ||
self.common_keys = common_keys | ||
|
||
def __call__(self, file_path_object_path): | ||
file_path, object_path = file_path_object_path | ||
ttree = uproot._util.regularize_object_path( | ||
file_path, | ||
object_path, | ||
self.custom_classes, | ||
self.allow_missing, | ||
self.real_options, | ||
) | ||
return ttree.arrays(self.common_keys) | ||
|
||
first5 = obj.arrays(common_keys, entry_start=0, entry_stop=5) | ||
meta = dask_awkward.core.typetracer_array(first5) | ||
|
||
return dask_awkward.from_map( | ||
_UprootOpenAndRead(custom_classes, allow_missing, real_options, common_keys), | ||
files, | ||
label="from-uproot", | ||
meta=meta, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relevant dask part cc @douglasdavis |
||
|
||
|
||
class _WrapDict(MutableMapping): | ||
def __init__(self, dict): | ||
self.dict = dict | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# BSD 3-Clause License; see https://github.com/scikit-hep/uproot4/blob/main/LICENSE | ||
|
||
import numpy | ||
import pytest | ||
import skhep_testdata | ||
|
||
import uproot | ||
|
||
dask = pytest.importorskip("dask") | ||
dask_awkward = pytest.importorskip("dask_awkward") | ||
|
||
|
||
def test_single_dask_awkward_array(): | ||
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
ttree = uproot.open(test_path) | ||
|
||
ak_array = ttree.arrays() | ||
dak_array = uproot.dask(test_path, library="ak") | ||
|
||
assert len(ak_array) == len(dak_array) | ||
|
||
assert dak_array.compute().to_list() == ak_array.to_list() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When comparing a dask-awkward array to a concrete awkward array try to use dask-awkward's |
||
|
||
|
||
def test_dask_concatenation(): | ||
test_path1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
test_path2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events" | ||
test_path3 = skhep_testdata.data_path("uproot-Zmumu-zlib.root") + ":events" | ||
test_path4 = skhep_testdata.data_path("uproot-Zmumu-lzma.root") + ":events" | ||
|
||
ak_array = uproot.concatenate([test_path1, test_path2, test_path3, test_path4]) | ||
dak_array = uproot.dask( | ||
[test_path1, test_path2, test_path3, test_path4], library="ak" | ||
) | ||
|
||
assert len(ak_array) == len(dak_array) | ||
|
||
assert dak_array.compute().to_list() == ak_array.to_list() | ||
|
||
|
||
def test_multidim_array(): | ||
test_path = ( | ||
skhep_testdata.data_path("uproot-sample-6.08.04-uncompressed.root") + ":sample" | ||
) | ||
ttree = uproot.open(test_path) | ||
|
||
ak_array = ttree.arrays() | ||
dak_array = uproot.dask(test_path, library="ak") | ||
|
||
assert len(ak_array) == len(dak_array) | ||
|
||
assert dak_array.compute().to_list() == ak_array.to_list() | ||
|
||
|
||
def test_chunking_single_num(): | ||
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
assert uproot.dask(test_path, step_size=10, library="ak").npartitions == 231 | ||
|
||
|
||
def test_chunking_single_string(): | ||
test_path = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
assert uproot.dask(test_path, step_size="500B", library="ak").npartitions == 330 | ||
|
||
|
||
def test_chunking_multiple_num(): | ||
filename1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
filename2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events" | ||
assert ( | ||
uproot.dask([filename1, filename2], step_size=10, library="ak").npartitions | ||
== 462 | ||
) | ||
|
||
|
||
def test_chunking_multiple_string(): | ||
filename1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
filename2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events" | ||
assert ( | ||
uproot.dask([filename1, filename2], step_size="500B", library="ak").npartitions | ||
== 1098 | ||
) | ||
|
||
|
||
def test_delay_open(): | ||
test_path1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events" | ||
test_path2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events" | ||
test_path3 = skhep_testdata.data_path("uproot-Zmumu-zlib.root") + ":events" | ||
test_path4 = skhep_testdata.data_path("uproot-Zmumu-lzma.root") + ":events" | ||
|
||
ak_array = uproot.concatenate([test_path1, test_path2, test_path3, test_path4]) | ||
dak_array = uproot.dask( | ||
[test_path1, test_path2, test_path3, test_path4], open_files=False, library="ak" | ||
) | ||
|
||
assert len(ak_array) == len(dak_array) | ||
|
||
assert dak_array.compute().to_list() == ak_array.to_list() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a committed PR, old code should be removed, not commented out.
But I think you still want to complain about Pandas.