-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask awkward support for uproot.dask #652
Conversation
for more information, see https://pre-commit.ci
src/uproot/behaviors/TBranch.py
Outdated
@dask.delayed | ||
def del_fn(ttree,start,stop): | ||
return ttree.arrays(common_keys,library='ak',entry_start=start,entry_stop=stop) | ||
|
||
dask_arrays = [] | ||
for ttree in hasbranches: | ||
entry_start, entry_stop = _regularize_entries_start_stop( | ||
ttree.tree.num_entries, None, None | ||
) | ||
entry_step = 0 | ||
if uproot._util.isint(step_size): | ||
entry_step = step_size | ||
else: | ||
entry_step = ttree.num_entries_for(step_size) | ||
def foreach(start): | ||
stop = min(start + entry_step, entry_stop) | ||
length = stop - start | ||
|
||
delayed_array = del_fn(ttree, start, stop) | ||
dask_arrays.append(delayed_array) | ||
|
||
for start in range(entry_start, entry_stop, entry_step): | ||
foreach(start) | ||
|
||
return dask_awkward.from_delayed(dask_arrays) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relevant dask section
src/uproot/behaviors/TBranch.py
Outdated
dask, _ = uproot.extras.dask() | ||
import dask_awkward | ||
|
||
delayed_open_fn = dask.delayed(uproot._util.regularize_object_path) | ||
|
||
dask_arrays = [] | ||
@dask.delayed | ||
def delayed_get_array(ttree): | ||
return ttree.arrays(library="ak") | ||
|
||
for file_path, object_path in files: | ||
delayed_tree = delayed_open_fn( | ||
file_path, object_path, custom_classes, allow_missing, real_options | ||
) | ||
delayed_array = delayed_get_array(delayed_tree) | ||
|
||
dask_arrays.append( | ||
delayed_array | ||
) | ||
return dask_awkward.from_delayed(dask_arrays) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relevant dask section
Hey @kkothari2001 thanks for the ping! I'll take a look at the PR in detail when I get a chance. But in the mean time I wanted to point you to a proof-of-concept for reading ROOT into dask-awkward here: https://github.com/ContinuumIO/dask-awkward/pull/44 - the method I used in that PoC does not use delayed, directly constructing so-called "high level" task graphs (dask-awkward has a generic API for creating dask-awkward arrays from any mapping with |
Thanks! I'll have a look at it. |
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
I worked on the optimization suggested by @douglasdavis and used from_map The issue we discussed in the meet regarding having many iterables of different lengths has been somewhat solved, but @douglasdavis will have to confirm if it is optimal. Apart from that dask-awkward has been added to testing dependencies for Python 3.8 since dask_awkward has a Python version requirement of >=3.8. |
src/uproot/behaviors/TBranch.py
Outdated
class _UprootRead: | ||
def __init__(self, hasbranches, branches) -> None: | ||
self.hasbranches = hasbranches | ||
self.branches = branches | ||
|
||
def __call__(self, i_start_stop): | ||
i, start, stop = i_start_stop | ||
return self.hasbranches[i].arrays( | ||
self.branches, entry_start=start, entry_stop=stop | ||
) | ||
|
||
partition_args = [] | ||
for i, ttree in enumerate(hasbranches): | ||
entry_start, entry_stop = _regularize_entries_start_stop( | ||
ttree.num_entries, None, None | ||
) | ||
entry_step = 0 | ||
if uproot._util.isint(step_size): | ||
entry_step = step_size | ||
else: | ||
entry_step = ttree.num_entries_for(step_size, expressions=common_keys) | ||
|
||
def foreach(start): | ||
stop = min(start + entry_step, entry_stop) | ||
partition_args.append((i, start, stop)) | ||
|
||
for start in range(entry_start, entry_stop, entry_step): | ||
foreach(start) | ||
|
||
first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5) | ||
meta = dask_awkward.core.typetracer_array(first5) | ||
|
||
return dask_awkward.from_map( | ||
_UprootRead(hasbranches, common_keys), | ||
partition_args, | ||
label="from-uproot", | ||
meta=meta, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relevant dask part cc @douglasdavis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notes: First I tried using the same code to create the typetracer array with ak.Array(first5.layout.typetracer.forget_length())
but it was throwing an error that made me realise it probably wasn't the correct way to get the tytracer array.
Going through the code for dask-awkward I found this function dask_awkward.core.typetracer_array
, it works, but please confirm if I'm using it right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail?
Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the entry_start
and entry_stop
numbers for a given TBasket (such as number 0
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number 5 was from my first PoC; I think it would make sense to have this as something that is configurable with a sensible default
src/uproot/behaviors/TBranch.py
Outdated
def _get_dak_array_delay_open( | ||
files, | ||
filter_name=no_filter, | ||
filter_typename=no_filter, | ||
filter_branch=no_filter, | ||
recursive=True, | ||
full_paths=False, | ||
custom_classes=None, | ||
allow_missing=False, | ||
real_options=None, # NOTE: a comma after **options breaks Python 2 | ||
): | ||
import dask_awkward | ||
|
||
ffile_path, fobject_path = files[0] | ||
obj = uproot._util.regularize_object_path( | ||
ffile_path, fobject_path, custom_classes, allow_missing, real_options | ||
) | ||
common_keys = obj.keys( | ||
recursive=recursive, | ||
filter_name=filter_name, | ||
filter_typename=filter_typename, | ||
filter_branch=filter_branch, | ||
full_paths=full_paths, | ||
) | ||
|
||
class _UprootOpenAndRead: | ||
def __init__( | ||
self, custom_classes, allow_missing, real_options, common_keys | ||
) -> None: | ||
self.custom_classes = custom_classes | ||
self.allow_missing = allow_missing | ||
self.real_options = real_options | ||
self.common_keys = common_keys | ||
|
||
def __call__(self, file_path_object_path): | ||
file_path, object_path = file_path_object_path | ||
ttree = uproot._util.regularize_object_path( | ||
file_path, | ||
object_path, | ||
self.custom_classes, | ||
self.allow_missing, | ||
self.real_options, | ||
) | ||
return ttree.arrays(self.common_keys) | ||
|
||
first5 = obj.arrays(common_keys, entry_start=0, entry_stop=5) | ||
meta = dask_awkward.core.typetracer_array(first5) | ||
|
||
return dask_awkward.from_map( | ||
_UprootOpenAndRead(custom_classes, allow_missing, real_options, common_keys), | ||
files, | ||
label="from-uproot", | ||
meta=meta, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relevant dask part cc @douglasdavis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good; sorry that I was slow to respond. Most of what I recommend below is refactoring; you can introduce src/uproot/_dask.py if you need a place to put all the new helper functions (and that would help keep most of this separate from the rest of TBranch.py, which is mostly not involved in anything Dask-related).
The tests look good—I don't think we can test dask-distributed in our continuous integration, but can you test it manually? Like, if you use these tests and switch to a multiprocessing Dask scheduler, that would verify that everything that needs to be serializable is serializable.
src/uproot/behaviors/TBranch.py
Outdated
# if library.name != "np": | ||
# raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a committed PR, old code should be removed, not commented out.
But I think you still want to complain about Pandas.
# if library.name != "np": | |
# raise NotImplementedError() | |
if library.name == "pd": | |
raise NotImplementedError() |
src/uproot/behaviors/TBranch.py
Outdated
allow_missing, | ||
real_options, | ||
) | ||
if library.name == "np": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want the code in TBranch.py to avoid special cases that depend on the value of library
; that's what library.*
methods are for. Can this be refactored to call library.something
here, and then have the different library
implementations do the different things?
src/uproot/behaviors/TBranch.py
Outdated
class _UprootRead: | ||
def __init__(self, hasbranches, branches) -> None: | ||
self.hasbranches = hasbranches | ||
self.branches = branches | ||
|
||
def __call__(self, i_start_stop): | ||
i, start, stop = i_start_stop | ||
return self.hasbranches[i].arrays( | ||
self.branches, entry_start=start, entry_stop=stop | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class can be defined at top-level, outside of this function. That might be necessary to serialize it and send it to a remote machine. (In general, nested classes and functions can depend on local variables, which can prevent them from being pickleable, which Dask needs to send data to remote processors.)
Even if that's not necessary for this helper function, having a common practice of making functions and classes top-level by default would make it so that we don't have to ask ourselves this questions whenever we see a nested function/class.
If you want to introduce a new src/uproot/_dask.py to have a place to put all of these helpers, that would be good. TBranch.py is getting very full (and it's always easier to merge files than split them).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this class can be defined at top-level, outside of this function.
Yes! it's definitely best to have dask callables as standalone classes or functions.
src/uproot/behaviors/TBranch.py
Outdated
class _UprootRead: | ||
def __init__(self, hasbranches, branches) -> None: | ||
self.hasbranches = hasbranches | ||
self.branches = branches | ||
|
||
def __call__(self, i_start_stop): | ||
i, start, stop = i_start_stop | ||
return self.hasbranches[i].arrays( | ||
self.branches, entry_start=start, entry_stop=stop | ||
) | ||
|
||
partition_args = [] | ||
for i, ttree in enumerate(hasbranches): | ||
entry_start, entry_stop = _regularize_entries_start_stop( | ||
ttree.num_entries, None, None | ||
) | ||
entry_step = 0 | ||
if uproot._util.isint(step_size): | ||
entry_step = step_size | ||
else: | ||
entry_step = ttree.num_entries_for(step_size, expressions=common_keys) | ||
|
||
def foreach(start): | ||
stop = min(start + entry_step, entry_stop) | ||
partition_args.append((i, start, stop)) | ||
|
||
for start in range(entry_start, entry_stop, entry_step): | ||
foreach(start) | ||
|
||
first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5) | ||
meta = dask_awkward.core.typetracer_array(first5) | ||
|
||
return dask_awkward.from_map( | ||
_UprootRead(hasbranches, common_keys), | ||
partition_args, | ||
label="from-uproot", | ||
meta=meta, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail?
Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the entry_start
and entry_stop
numbers for a given TBasket (such as number 0
).
src/uproot/behaviors/TBranch.py
Outdated
full_paths=full_paths, | ||
) | ||
|
||
class _UprootOpenAndRead: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another nested class that might not need to be nested.
tests/test_0652_dask-for-awkward.py
Outdated
|
||
assert len(ak_array) == len(dak_array) | ||
|
||
assert dak_array.compute().to_list() == ak_array.to_list() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When comparing a dask-awkward array to a concrete awkward array try to use dask-awkward's assert_eq
; it does more checks than just for equality. For example see
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed some final refactorings on Zoom, but when you're done with them, feel free to squash-and-merge!
for more information, see https://pre-commit.ci
…into dask-awkward-support
for more information, see https://pre-commit.ci
* Changed everything we _think_ we need to change. * All tests pass. * Change test 0034 * Alter instances of from_datashape * Removed 'v1/v2 insensitivity' checks. We're all-in for v2 now. * Added basic dask-awkward support * Delay file open feature * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Complete dask-awkward using from_map * Added tests for dask_awkward * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Added dask-awkward to testing dependancies * Add dask-awk to dependancies in github workflow * Skip dask_awkward tests for all except python version == 3.8 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Make suggested changes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add back the noqa's which are being removed for some reason * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Refactored uproot.dask as suggested * Complete tests for dask_awkward * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Make awkward the default library for uproot.dask * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: Jim Pivarski <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This also addresses #652 (comment), but is more up-to-date. It _replaces_ #680, which should be closed.
This also addresses #652 (comment), but is more up-to-date. It _replaces_ #680, which should be closed.
@jpivarski @douglasdavis please have a look
Usage:
WIP:
Tests remain
A few things to cleanup (move dask_awkward to extras, make awkward the default library to use)