-
-
Notifications
You must be signed in to change notification settings - Fork 18.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask shuffle performance help #43155
Comments
Have you profiled this? I'd speculate that the hotspot is in pd.concat |
Yes, so here is the prun output
Looking at a flame graph with snakeviz I see the following breakdown, with about a 50% reduction at each stage
|
So the first thing that stands out is that most of the pickle.loads time is in the Index constructor, which should be avoidable by passing the dtype to cls.new within _new_Index (shorter term on your end could be avoided by using an integer dummy instead of the string "partitions") After the frames are unpickled, the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I'll have to give some thought to how to avoid this. |
I'm entirely happy to choose some arbitrary random integer if that would help performance. I am a bit surprised that using a string column name has an effect. I'm a bit curious to know more about what's happening here, but I'm also happy to remain ignorant if that knowledge will become obsolete.
Oh cool. I hadn't thought too much about the costs of that check but yes, that makes sense. |
In unpickling we are calling the Index constructor and in this example an object-dtype ndarray is being passed. The Index constructor tries to do dtype inference, which in this example is pretty much the worst case (all integers and then one string at the end). |
Ah, but if I happened to to have string column names in the other columns then changing partitions to 1387287258 or something wouldn't have a positive effect, correct? In this particular case, the fact that the column names are integers is coincidental (well, mostly coincidental). I probably shouldn't optimize too heavily around that case. The distribution of column names for us is the same as for pandas in this case, except that we need to add one more for the output partition. |
Right. Hopefully it will be a moot point in the next release. |
@jbrockmendel any more ideas on this? The concat seems to be the hotspot for us, usually moreso than You can see these profiles recorded by py-spy of our workload on a dask cluster: (use the "left-heavy" view to see the breakdown of overall runtime, and note that the the time units are wrong because of benfred/py-spy#430) I could also make a smaller reproducer for just this concat performance. Would it be helpful to split into a separate issue? |
|
Sounds great! Looking forward to those improvements. Let us know when you have something we should try out. |
Below I'm pasting an implementation of what I have in mind. Its about 6x faster than pd.concat on the example in the OP. I'm going to ask you to take the baton to a) test it against a wide variety of DataFrames and b) profile it
|
I haven't tried this new method out on our workload yet. But I wanted to update you that I think I've found that the biggest bottleneck for us during concatenation is actually the GIL. So though this method should help, I know if it'll solve the problem. py-spy profile of our concat-heavy phase, recorded with You can see that 70% of runtime is spent in Unfortunately plain In [1]: import numpy as np
In [2]: %load_ext ptime
In [3]: shards = [np.random.random(500_000) for _ in range(400)]
In [4]: %ptime -n4 np.concatenate(shards)
Total serial time: 3.30 s
Total parallel time: 1.90 s
For a 1.74X speedup across 4 threads We should have mentioned we were trying to concatenate in all the worker threads (typically 4, but could be any number). Testing locally on my mac, with DataFrames of around the same size and number as we were using on this cluster, it looks like your new method is a 1.4x speedup over
Script: https://gist.github.com/gjoseph92/35372dc59de3f3360f7ca377ba321ec4 |
Can you try it with the workload in the OP? I'd like to confirm that you get something closer to the 6x that I got locally.
Not a whole lot I can do to help with this, but if you find something, I'll be eager to hear about it.
Could potentially parallelize the for loop in concat_known_aligned, but I don't think there's much mileage to be gained there. It also looks like |
With the OP example, it's printing ~45Mib/s with It doesn't look like the In [3]: @numba.jit(nopython=True, nogil=True)
...: def concat(arrs):
...: return np.concatenate(arrs) but the parallel performance was about the same:
Haven't profiled further though to be sure it's actually the GIL in this case. Maybe we could talk to someone knowledgeable on the NumPy side at some point? I'm sure releasing the GIL in
I'll try on main both with both |
Yah, your best bet is either the numpy tracker or numpy-users.
It wouldn't be that difficult to write a cython near-equivalent of that C implementation that could be declared as |
@gjoseph92 any word on this from the numpy folks? |
@jbrockmendel thanks for checking in—I forgot to reach out to them. I'll be out this week, but maybe @fjetter or @jrbourbeau could open an issue if they have time? |
When NumPy allocates memory, it registers those allocations with Python's If there are multiple calls to
As the gist of A separate benchmarking for your use case around copying might provide additional insight here. |
We've made some performance improvements in the alignment-checking recently. I'd be curious to see updated profiling results here. |
Adding @hendrikmakait for context (he's been active in shuffling recently). However, at one point there was also a move to shift to arrow temporarily when we hit the many-small-dataframes case. It may be that that makes us less sensitive to improvements to this particular part of the arrow codebase. Hendrik will know more. Also as a heads-up, he's coming off of a conference/offsite, and so may be less responsive than typical. |
Hi Folks,
I'm experimenting with a new shuffle algorithm for Dask dataframe. This is what backs distributed versions of join, set_index, groupby-apply, or anything that requires the large movement of rows around a distributed dataframe.
Things are coming along well, but I'm running into a performance challenge with pandas and I would like to solicit feedback here before diving more deeply. If this isn't the correct venue please let me know and I'll shift this elsewhere.
We've constructed a script (thanks @gjoseph92 for starting this) that creates a random dataframe and a column on which to split, rearrange, serialize/deserialize, and concat a couple of times. This is representative of the operations that we're trying to do, except that in between a couple of steps there the shards/groups end up coming from different machines, rather than being the same shards
With 10,000 groups I get around 40 MB/s bandwidth.
With 1,000 groups I get around 230 MB/s bandwidth
With 200 or fewer groups I get around 500 MB/s bandwidth
Obviously, one answer here is "use fewer groups, Pandas isn't designed to operate efficiently with only a few rows". That's fair and we're trying to design for that, but there is always pressure to shrink things down so I would like to explore and see if there is anything we can do on the pandas side to add tolerance here.
The text was updated successfully, but these errors were encountered: