Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose column optimization more explicitly? #559

Open
pfackeldey opened this issue Dec 10, 2024 · 8 comments · May be fixed by #565
Open

Expose column optimization more explicitly? #559

pfackeldey opened this issue Dec 10, 2024 · 8 comments · May be fixed by #565

Comments

@pfackeldey
Copy link
Collaborator

Hi,

it would be nice if there's a way to use the column optimization without having to "buy into" building (large) compute graphs. One thing that could be useful is to expose an API for the column optimization specifically, enabled through a new namespace in dask-awkward called manual (because one is doing the column optimization now manually instead of automatically as part of the usual dask graph optimization).

Consider the following example:

import coffea
import awkward as ak
import dask_awkward as dak

# create a lazy dask-awkward array
events = coffea.NanoEventsFactory.from_root(...).events()

# define my analysis with eager awkward-array
def my_analysis(events: ak.Array) -> ak.Array:
  ...
  
# explicitly typetrace `my_analysis` to infer the required columns,
# this is not requiring `my_analysis` to be part of a dask graph!
tracer = dak.manual.to_tracer(events)
out_tracer, touched_columns = dak.manual.typetrace(my_analysis, tracer)

# use `touched_columns` to update the IO layer to only load these columns,
# this function looks for the `AwkwardInputLayer` and projects its `io_func`
events = dak.manual.project_columns(events, touched_columns)

Why is this useful? - One can now continue with a non-dask compute model but still benefit from dask-awkward's partitioning and column optimization, e.g.:

# run analysis per partition only with the needed columns.
# this loads only `touched_columns` for `chunk` because 
# of the previous explicit `project_columns` call
for i in range(events.npartitions):
  chunk = events.partition[i].compute()
  analysed_events = my_analysis(chunk)

This pattern allows to run this loop locally e.g. with multiprocessing, on batch systems as classic batch jobs, or you can fall back to continue working with dask-awkward on events (the normal use-case) with e.g. map_partitions/mapfilter/dak-operations.

Some additional benefits would be that tracing failures might be easier to debug, because there's a simple way to rerun only the tracing step: dak.manual.typetrace(my_analysis, tracer). In addition, one would be able to reuse touched_columns of one tracing step for other input datasets (where appropriate of course), inspect the touched_columns (I suppose there's already dak.report_necessary_columns for this), or even manipulate touched_columns as needed.

While there's still the standard way to do things, i.e., use dask-awkward operations and it's optimization procedures, the manual namespace would allow people to benefit from dask-awkward's IO features (column optimization + partitioning) without having to build dask graphs (except for the one AwkwardInputLayer) for the computation of my_analysis.

What do you think about this?

@martindurant
Copy link
Collaborator

So, this isn't something like:

darr = make_dak_array_somehow()
darr2 = darr.map_partitions(eager_function)  # runs typetracer through function 
cols = dak.necessary_columns(darr2)
final = dak.project_columns(darr)

where the final step doesn't exist yet as a user function, but is done internally in column optimization.

@pfackeldey
Copy link
Collaborator Author

In principle yes, but

  • dak.map_partitions kind of restricts the mapped function by not allowing anything else except for a ak.Array as return type.
  • Explicitly calling the namespace manual and calling methods typetrace makes a user aware that they are now consciously typetracing, which is not clear when calling dak.map_partitions. (So that's more of an understanding/maybe documentation thing).
  • In principle there's also not really a reason to make the mapped function part of the graph in this use-case here, I'd argue?

@martindurant
Copy link
Collaborator

In principle there's also not really a reason to make the mapped function part of the graph in this use-case here, I'd argue?

the typetracing and necessary_columns steps are the only expensive ones, making the graph is essentially free. The user probably does final.map(eager_function) (map_partitions or mapfilter) as the next step.

@pfackeldey
Copy link
Collaborator Author

pfackeldey commented Dec 12, 2024

We need a way though to trace a function that could return anything; I'd say it's a limitation for physics analysis to only be able to return a single awkward-array. It's not uncommon to track any meta data in my_analysis aswell (where it's not possible to attach it to the dak.Array).
I'm not sure how to do this with map_partitions.

Apart from that: I think I can write dak.project_columns(darr, cols), but it would be necessary for the column projection to return some more information (e.g. the FormStructures) from cols = dak.necessary_columns(darr2). Just the column names are not sufficient as far as I can see (unless the projection API would be unified: scikit-hep/uproot5#1349)

@pfackeldey
Copy link
Collaborator Author

pfackeldey commented Dec 13, 2024

I suppose I could use dask.delayed instead of map_partitions though with a single partition:

io = uproot.dask({"https://raw.githubusercontent.com/CoffeaTeam/coffea/master/tests/samples/nano_dy.root": "Events"})
dak.report_necessary_columns(dask.delayed(lambda io: io.nJet)(io.partitions[0]))
# {'from-uproot-138b384738005b2a7a7eefbb600ca6c2': frozenset({'nJet'})}

That's great!

However this doesn't work (returning additional metadata, here: None):

dak.report_necessary_columns(dask.delayed(lambda io: (io.nJet, None))(io.partitions[0]))
# {'from-uproot-138b384738005b2a7a7eefbb600ca6c2': frozenset()}

# but it does compute correctly:
dask.delayed(lambda io: (io.nJet, None))(io.partitions[0]).compute()
# (<Array [5, 8, 5, 3, 5, 8, 4, 4, ..., 4, 9, 3, 2, 3, 1, 6, 2] type='40 * uint32'>, None)

Is the column optimization loading everything in this case? Or does it only load nJet, but dak.report_necessary_columns doesn't report it properly?

If that would work, I'd like this for tracing 'manually'. You've got a good point that "making the graph is essentially free", and there's already dak.report_necessary_columns for tracing.


Update:
One has to do something with nJet in order to "force-touch" it with the typetracer:

dak.report_necessary_columns(dask.delayed(lambda io: (io.nJet + 1, None))(io.partitions[0]))
# {'from-uproot-587894178ac0719cd1a04ae5f523da00': frozenset({'nJet'})}

In the single return case the touching was forced because of the explicit single return, something to be aware of...

@NJManganelli
Copy link

Tagging issue to remind myself to think about this, when I start cleaning-up/reorganizing column-joining code.

@pfackeldey
Copy link
Collaborator Author

pfackeldey commented Dec 18, 2024

This would also allow for an improvement of the optimization process by:

  1. build a graph for dataset A
  2. use dask.graph_manipulation.clone to re-key and copy the 'graph of dataset A'
  3. replace the IO layer for dataset A with dataset B (maybe in most cases it may be sufficient to just replace the file path even?)
  4. Infer the needed columns once manually of 'graph of dataset A' and store them
  5. Use the inferred need columns to manually do the column optimization for 'graph of dataset A' and for 'graph of dataset B' without needing to run the typetracer again through 'graph of dataset B'

This improvement would eliminate the need to build repetitive graphs for similar datasets by just cloning them; and it would eliminate repetitive typetracer runs.

In many cases for HEP analyses I could even see that there are only very little differences needed for analyzing dataset A vs dataset B. Here, we could additionally manually add/manipulate the needed columns that differ between these two datasets, which would further reduce the time spent in column optimization.

This currently assumes that dask.graph_manipulation.clone (this is possible since #554) does a deepcopy + deep re-key, if it does not, any other cloning mechanism - that we'd invent - would still need this kind of manual column projection. If I understand this correctly from @martindurant it's typically cheap to build the graph, but expensive to run the column optimization

@martindurant
Copy link
Collaborator

Totally agree, @pfackeldey . We probably need a flag on the iolayer to say "this has already picked explicit columns, don't optimize", so that you can do this run-once, apply-many-times model with clone. Probably we would do the initial column choosing before clone, but it should be fast and not matter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants