-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask awkward support for uproot.dask #652
Merged
Merged
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
b34350a
Changed everything we _think_ we need to change.
jpivarski 5957e0e
All tests pass.
jpivarski 506e2fe
Change test 0034
kkothari2001 83a3953
Alter instances of from_datashape
kkothari2001 2547a04
Removed 'v1/v2 insensitivity' checks. We're all-in for v2 now.
jpivarski d9890b2
Added basic dask-awkward support
kkothari2001 4a8f59e
Delay file open feature
kkothari2001 a476d47
Merge remote-tracking branch 'origin/main' into dask-awkward-support
kkothari2001 d46481f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] c5ed2c0
Complete dask-awkward using from_map
kkothari2001 42afafa
Added tests for dask_awkward
kkothari2001 c5c5c15
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 2df67c5
Added dask-awkward to testing dependancies
kkothari2001 18c6647
Add dask-awk to dependancies in github workflow
kkothari2001 2b904f8
Skip dask_awkward tests for all except python version == 3.8
kkothari2001 d38ac6a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 86501e2
Make suggested changes
kkothari2001 d012d70
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] d2fe1a6
Add back the noqa's which are being removed for some reason
kkothari2001 50b16f2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 4ab1564
Refactored uproot.dask as suggested
kkothari2001 da3c949
Complete tests for dask_awkward
kkothari2001 1e54bb1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] c040121
Make awkward the default library for uproot.dask
kkothari2001 5c0e8ef
Merge branch 'dask-awkward-support' of github.com:scikit-hep/uproot5 …
kkothari2001 ac585a1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -598,8 +598,8 @@ def dask( | |
""" | ||
files = uproot._util.regularize_files(files) | ||
library = uproot.interpretation.library._regularize_library(library) | ||
if library.name != "np": | ||
raise NotImplementedError() | ||
# if library.name != "np": | ||
# raise NotImplementedError() | ||
|
||
real_options = dict(options) | ||
if "num_workers" not in real_options: | ||
|
@@ -609,33 +609,60 @@ def dask( | |
|
||
filter_branch = uproot._util.regularize_filter(filter_branch) | ||
|
||
if open_files: | ||
return _get_dask_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
if library.name == 'np': | ||
if open_files: | ||
return _get_dask_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dask_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
elif library.name == 'ak': | ||
if open_files: | ||
return _get_dak_array( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
step_size, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dak_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
else: | ||
return _get_dask_array_delay_open( | ||
files, | ||
filter_name, | ||
filter_typename, | ||
filter_branch, | ||
recursive, | ||
full_paths, | ||
custom_classes, | ||
allow_missing, | ||
real_options, | ||
) | ||
|
||
|
||
raise NotImplementedError() | ||
class Report: | ||
""" | ||
Args: | ||
|
@@ -3548,6 +3575,151 @@ def delayed_get_array(ttree, key): | |
dask_dict[key] = da.concatenate(dask_arrays, allow_unknown_chunksizes=True) | ||
return dask_dict | ||
|
||
def _get_dak_array( | ||
files, | ||
filter_name=no_filter, | ||
filter_typename=no_filter, | ||
filter_branch=no_filter, | ||
recursive=True, | ||
full_paths=False, | ||
step_size="100 MB", | ||
custom_classes=None, | ||
allow_missing=False, | ||
real_options=None, # NOTE: a comma after **options breaks Python 2 | ||
): | ||
dask, _ = uproot.extras.dask() | ||
import dask_awkward | ||
hasbranches = [] | ||
common_keys = None | ||
is_self = [] | ||
|
||
count = 0 | ||
for file_path, object_path in files: | ||
obj = uproot._util.regularize_object_path( | ||
file_path, object_path, custom_classes, allow_missing, real_options | ||
) | ||
|
||
if obj is not None: | ||
count += 1 | ||
|
||
if isinstance(obj, TBranch) and len(obj.keys(recursive=True)) == 0: | ||
original = obj | ||
obj = obj.parent | ||
is_self.append(True) | ||
|
||
def real_filter_branch(branch): | ||
return branch is original and filter_branch(branch) | ||
|
||
else: | ||
is_self.append(False) | ||
real_filter_branch = filter_branch | ||
|
||
hasbranches.append(obj) | ||
|
||
new_keys = obj.keys( | ||
recursive=recursive, | ||
filter_name=filter_name, | ||
filter_typename=filter_typename, | ||
filter_branch=real_filter_branch, | ||
full_paths=full_paths, | ||
) | ||
|
||
if common_keys is None: | ||
common_keys = new_keys | ||
else: | ||
new_keys = set(new_keys) | ||
common_keys = [key for key in common_keys if key in new_keys] | ||
|
||
if count == 0: | ||
raise ValueError( | ||
"allow_missing=True and no TTrees found in\n\n {}".format( | ||
"\n ".join( | ||
"{" | ||
+ "{}: {}".format( | ||
repr(f.file_path if isinstance(f, HasBranches) else f), | ||
repr(f.object_path if isinstance(f, HasBranches) else o), | ||
) | ||
+ "}" | ||
for f, o in files | ||
) | ||
) | ||
) | ||
|
||
if len(common_keys) == 0 or not (all(is_self) or not any(is_self)): | ||
raise ValueError( | ||
"TTrees in\n\n {}\n\nhave no TBranches in common".format( | ||
"\n ".join( | ||
"{" | ||
+ "{}: {}".format( | ||
repr(f.file_path if isinstance(f, HasBranches) else f), | ||
repr(f.object_path if isinstance(f, HasBranches) else o), | ||
) | ||
+ "}" | ||
for f, o in files | ||
) | ||
) | ||
) | ||
|
||
|
||
@dask.delayed | ||
def del_fn(ttree,start,stop): | ||
return ttree.arrays(common_keys,library='ak',entry_start=start,entry_stop=stop) | ||
|
||
dask_arrays = [] | ||
for ttree in hasbranches: | ||
entry_start, entry_stop = _regularize_entries_start_stop( | ||
ttree.tree.num_entries, None, None | ||
) | ||
entry_step = 0 | ||
if uproot._util.isint(step_size): | ||
entry_step = step_size | ||
else: | ||
entry_step = ttree.num_entries_for(step_size) | ||
def foreach(start): | ||
stop = min(start + entry_step, entry_stop) | ||
length = stop - start | ||
|
||
delayed_array = del_fn(ttree, start, stop) | ||
dask_arrays.append(delayed_array) | ||
|
||
for start in range(entry_start, entry_stop, entry_step): | ||
foreach(start) | ||
|
||
return dask_awkward.from_delayed(dask_arrays) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relevant dask section |
||
|
||
|
||
def _get_dak_array_delay_open( | ||
files, | ||
filter_name=no_filter, | ||
filter_typename=no_filter, | ||
filter_branch=no_filter, | ||
recursive=True, | ||
full_paths=False, | ||
custom_classes=None, | ||
allow_missing=False, | ||
real_options=None, # NOTE: a comma after **options breaks Python 2 | ||
): | ||
dask, _ = uproot.extras.dask() | ||
import dask_awkward | ||
|
||
delayed_open_fn = dask.delayed(uproot._util.regularize_object_path) | ||
|
||
dask_arrays = [] | ||
@dask.delayed | ||
def delayed_get_array(ttree): | ||
return ttree.arrays(library="ak") | ||
|
||
for file_path, object_path in files: | ||
delayed_tree = delayed_open_fn( | ||
file_path, object_path, custom_classes, allow_missing, real_options | ||
) | ||
delayed_array = delayed_get_array(delayed_tree) | ||
|
||
dask_arrays.append( | ||
delayed_array | ||
) | ||
return dask_awkward.from_delayed(dask_arrays) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relevant dask section |
||
|
||
|
||
class _WrapDict(MutableMapping): | ||
def __init__(self, dict): | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a committed PR, old code should be removed, not commented out.
But I think you still want to complain about Pandas.