Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask awkward support for uproot.dask #652

Merged
merged 26 commits into from
Aug 11, 2022
Merged

Conversation

kkothari2001
Copy link
Collaborator

@jpivarski @douglasdavis please have a look

Usage:

>>> import uproot, skhep_testdata
>>> 
>>> filename1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
>>> filename2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
>>> 
>>> uproot.dask(filename1, library='ak')
dask.awkward<from-delayed, npartitions=1>
>>> 
>>> uproot.dask(filename1, library='ak').compute()
<Array [{Type: 'GT', Run: 148031, ...}, ...] type='2304 * {Type: string, Ru...'>
>>> 
>>> uproot.dask(filename1, step_size='100 B',library='ak')
dask.awkward<from-delayed, npartitions=2304>
>>> uproot.dask(filename1, step_size='100 B',library='ak').compute()
<Array [{Type: 'GT', Run: 148031, ...}, ...] type='2304 * {Type: string, Ru...'>
>>> 
>>> uproot.dask(filename1, step_size=76,library='ak')
dask.awkward<from-delayed, npartitions=31>
>>> uproot.dask(filename1, step_size=76,library='ak').compute()
<Array [{Type: 'GT', Run: 148031, ...}, ...] type='2304 * {Type: string, Ru...'>
>>> 
>>> 
>>> uproot.dask([filename1,filename2], step_size=76,library='ak')
dask.awkward<from-delayed, npartitions=62>
>>> uproot.dask([filename1,filename2], step_size=76,library='ak').compute()
<Array [{Type: 'GT', Run: 148031, ...}, ...] type='4608 * {Type: string, Ru...'>
>>> 
>>> 
>>> uproot.dask([filename1,filename2],library='ak',open_files=True)
dask.awkward<from-delayed, npartitions=2>
>>> uproot.dask([filename1,filename2],library='ak', open_files=False)
dask.awkward<from-delayed, npartitions=2>
>>> uproot.dask([filename1,filename2],library='ak', open_files=False).compute()
<Array [{Type: 'GT', Run: 148031, ...}, ...] type='4608 * {Type: string, Ru...'>

WIP:
Tests remain
A few things to cleanup (move dask_awkward to extras, make awkward the default library to use)

Comment on lines 3664 to 3688
@dask.delayed
def del_fn(ttree,start,stop):
return ttree.arrays(common_keys,library='ak',entry_start=start,entry_stop=stop)

dask_arrays = []
for ttree in hasbranches:
entry_start, entry_stop = _regularize_entries_start_stop(
ttree.tree.num_entries, None, None
)
entry_step = 0
if uproot._util.isint(step_size):
entry_step = step_size
else:
entry_step = ttree.num_entries_for(step_size)
def foreach(start):
stop = min(start + entry_step, entry_stop)
length = stop - start

delayed_array = del_fn(ttree, start, stop)
dask_arrays.append(delayed_array)

for start in range(entry_start, entry_stop, entry_step):
foreach(start)

return dask_awkward.from_delayed(dask_arrays)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask section

Comment on lines 3702 to 3721
dask, _ = uproot.extras.dask()
import dask_awkward

delayed_open_fn = dask.delayed(uproot._util.regularize_object_path)

dask_arrays = []
@dask.delayed
def delayed_get_array(ttree):
return ttree.arrays(library="ak")

for file_path, object_path in files:
delayed_tree = delayed_open_fn(
file_path, object_path, custom_classes, allow_missing, real_options
)
delayed_array = delayed_get_array(delayed_tree)

dask_arrays.append(
delayed_array
)
return dask_awkward.from_delayed(dask_arrays)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask section

@douglasdavis
Copy link
Contributor

Hey @kkothari2001 thanks for the ping! I'll take a look at the PR in detail when I get a chance.

But in the mean time I wanted to point you to a proof-of-concept for reading ROOT into dask-awkward here: https://github.com/ContinuumIO/dask-awkward/pull/44 - the method I used in that PoC does not use delayed, directly constructing so-called "high level" task graphs (dask-awkward has a generic API for creating dask-awkward arrays from any mapping with dask_awkward.from_map). The main benefit of the from_map API over using delayed is that the task graph created by from_map can be optimized in ways that task graphs of delayed objects cannot.

@kkothari2001
Copy link
Collaborator Author

Thanks! I'll have a look at it.

@kkothari2001 kkothari2001 marked this pull request as ready for review August 1, 2022 15:08
@kkothari2001
Copy link
Collaborator Author

cc @jpivarski @douglasdavis

I worked on the optimization suggested by @douglasdavis and used from_map

The issue we discussed in the meet regarding having many iterables of different lengths has been somewhat solved, but @douglasdavis will have to confirm if it is optimal. Apart from that dask-awkward has been added to testing dependencies for Python 3.8 since dask_awkward has a Python version requirement of >=3.8.

Comment on lines 3666 to 3703
class _UprootRead:
def __init__(self, hasbranches, branches) -> None:
self.hasbranches = hasbranches
self.branches = branches

def __call__(self, i_start_stop):
i, start, stop = i_start_stop
return self.hasbranches[i].arrays(
self.branches, entry_start=start, entry_stop=stop
)

partition_args = []
for i, ttree in enumerate(hasbranches):
entry_start, entry_stop = _regularize_entries_start_stop(
ttree.num_entries, None, None
)
entry_step = 0
if uproot._util.isint(step_size):
entry_step = step_size
else:
entry_step = ttree.num_entries_for(step_size, expressions=common_keys)

def foreach(start):
stop = min(start + entry_step, entry_stop)
partition_args.append((i, start, stop))

for start in range(entry_start, entry_stop, entry_step):
foreach(start)

first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5)
meta = dask_awkward.core.typetracer_array(first5)

return dask_awkward.from_map(
_UprootRead(hasbranches, common_keys),
partition_args,
label="from-uproot",
meta=meta,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask part cc @douglasdavis

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes: First I tried using the same code to create the typetracer array with ak.Array(first5.layout.typetracer.forget_length()) but it was throwing an error that made me realise it probably wasn't the correct way to get the tytracer array.

Going through the code for dask-awkward I found this function dask_awkward.core.typetracer_array, it works, but please confirm if I'm using it right.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail?

Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the entry_start and entry_stop numbers for a given TBasket (such as number 0).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number 5 was from my first PoC; I think it would make sense to have this as something that is configurable with a sensible default

Comment on lines 3706 to 3759
def _get_dak_array_delay_open(
files,
filter_name=no_filter,
filter_typename=no_filter,
filter_branch=no_filter,
recursive=True,
full_paths=False,
custom_classes=None,
allow_missing=False,
real_options=None, # NOTE: a comma after **options breaks Python 2
):
import dask_awkward

ffile_path, fobject_path = files[0]
obj = uproot._util.regularize_object_path(
ffile_path, fobject_path, custom_classes, allow_missing, real_options
)
common_keys = obj.keys(
recursive=recursive,
filter_name=filter_name,
filter_typename=filter_typename,
filter_branch=filter_branch,
full_paths=full_paths,
)

class _UprootOpenAndRead:
def __init__(
self, custom_classes, allow_missing, real_options, common_keys
) -> None:
self.custom_classes = custom_classes
self.allow_missing = allow_missing
self.real_options = real_options
self.common_keys = common_keys

def __call__(self, file_path_object_path):
file_path, object_path = file_path_object_path
ttree = uproot._util.regularize_object_path(
file_path,
object_path,
self.custom_classes,
self.allow_missing,
self.real_options,
)
return ttree.arrays(self.common_keys)

first5 = obj.arrays(common_keys, entry_start=0, entry_stop=5)
meta = dask_awkward.core.typetracer_array(first5)

return dask_awkward.from_map(
_UprootOpenAndRead(custom_classes, allow_missing, real_options, common_keys),
files,
label="from-uproot",
meta=meta,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relevant dask part cc @douglasdavis

Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good; sorry that I was slow to respond. Most of what I recommend below is refactoring; you can introduce src/uproot/_dask.py if you need a place to put all the new helper functions (and that would help keep most of this separate from the rest of TBranch.py, which is mostly not involved in anything Dask-related).

The tests look good—I don't think we can test dask-distributed in our continuous integration, but can you test it manually? Like, if you use these tests and switch to a multiprocessing Dask scheduler, that would verify that everything that needs to be serializable is serializable.

Comment on lines 601 to 602
# if library.name != "np":
# raise NotImplementedError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a committed PR, old code should be removed, not commented out.

But I think you still want to complain about Pandas.

Suggested change
# if library.name != "np":
# raise NotImplementedError()
if library.name == "pd":
raise NotImplementedError()

allow_missing,
real_options,
)
if library.name == "np":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the code in TBranch.py to avoid special cases that depend on the value of library; that's what library.* methods are for. Can this be refactored to call library.something here, and then have the different library implementations do the different things?

Comment on lines 3666 to 3675
class _UprootRead:
def __init__(self, hasbranches, branches) -> None:
self.hasbranches = hasbranches
self.branches = branches

def __call__(self, i_start_stop):
i, start, stop = i_start_stop
return self.hasbranches[i].arrays(
self.branches, entry_start=start, entry_stop=stop
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can be defined at top-level, outside of this function. That might be necessary to serialize it and send it to a remote machine. (In general, nested classes and functions can depend on local variables, which can prevent them from being pickleable, which Dask needs to send data to remote processors.)

Even if that's not necessary for this helper function, having a common practice of making functions and classes top-level by default would make it so that we don't have to ask ourselves this questions whenever we see a nested function/class.

If you want to introduce a new src/uproot/_dask.py to have a place to put all of these helpers, that would be good. TBranch.py is getting very full (and it's always easier to merge files than split them).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class can be defined at top-level, outside of this function.

Yes! it's definitely best to have dask callables as standalone classes or functions.

Comment on lines 3666 to 3703
class _UprootRead:
def __init__(self, hasbranches, branches) -> None:
self.hasbranches = hasbranches
self.branches = branches

def __call__(self, i_start_stop):
i, start, stop = i_start_stop
return self.hasbranches[i].arrays(
self.branches, entry_start=start, entry_stop=stop
)

partition_args = []
for i, ttree in enumerate(hasbranches):
entry_start, entry_stop = _regularize_entries_start_stop(
ttree.num_entries, None, None
)
entry_step = 0
if uproot._util.isint(step_size):
entry_step = step_size
else:
entry_step = ttree.num_entries_for(step_size, expressions=common_keys)

def foreach(start):
stop = min(start + entry_step, entry_stop)
partition_args.append((i, start, stop))

for start in range(entry_start, entry_stop, entry_step):
foreach(start)

first5 = hasbranches[0].arrays(common_keys, entry_start=0, entry_stop=5)
meta = dask_awkward.core.typetracer_array(first5)

return dask_awkward.from_map(
_UprootRead(hasbranches, common_keys),
partition_args,
label="from-uproot",
meta=meta,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What motivated the number 5? Is it possible for these 5 entries to not have something that you're looking for, and in that case, does it fail?

Since reading happens at a per-TBasket level, getting the whole first TBasket might make more sense. TBranch.basket_entry_start_stop would give you the entry_start and entry_stop numbers for a given TBasket (such as number 0).

full_paths=full_paths,
)

class _UprootOpenAndRead:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nested class that might not need to be nested.


assert len(ak_array) == len(dak_array)

assert dak_array.compute().to_list() == ak_array.to_list()
Copy link
Contributor

@douglasdavis douglasdavis Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When comparing a dask-awkward array to a concrete awkward array try to use dask-awkward's assert_eq; it does more checks than just for equality. For example see

https://github.com/ContinuumIO/dask-awkward/blob/7b5ca3c7077b1ab3524c581cf28950ade5b6907f/tests/test_io.py#L238-L244

Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed some final refactorings on Zoom, but when you're done with them, feel free to squash-and-merge!

@kkothari2001 kkothari2001 merged commit 49f83dd into main Aug 11, 2022
@kkothari2001 kkothari2001 deleted the dask-awkward-support branch August 11, 2022 05:17
Moelf pushed a commit to Moelf/uproot5 that referenced this pull request Aug 18, 2022
* Changed everything we _think_ we need to change.

* All tests pass.

* Change test 0034

* Alter instances of from_datashape

* Removed 'v1/v2 insensitivity' checks. We're all-in for v2 now.

* Added basic dask-awkward support

* Delay file open feature

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Complete dask-awkward using from_map

* Added tests for dask_awkward

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Added dask-awkward to testing dependancies

* Add dask-awk to dependancies in github workflow

* Skip dask_awkward tests for all except python version == 3.8

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Make suggested changes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Add back the noqa's which are being removed for some reason

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Refactored uproot.dask as suggested

* Complete tests for dask_awkward

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Make awkward the default library for uproot.dask

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

Co-authored-by: Jim Pivarski <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
jpivarski added a commit that referenced this pull request Aug 24, 2022
This also addresses #652 (comment), but is more up-to-date.

It _replaces_ #680, which should be closed.
jpivarski added a commit that referenced this pull request Aug 24, 2022
This also addresses #652 (comment), but is more up-to-date.

It _replaces_ #680, which should be closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants