Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask shuffle performance help #43155

Open
mrocklin opened this issue Aug 21, 2021 · 20 comments
Open

Dask shuffle performance help #43155

mrocklin opened this issue Aug 21, 2021 · 20 comments
Labels
Compat pandas objects compatability with Numpy or Python functions Enhancement Performance Memory or execution speed performance

Comments

@mrocklin
Copy link
Contributor

Hi Folks,

I'm experimenting with a new shuffle algorithm for Dask dataframe. This is what backs distributed versions of join, set_index, groupby-apply, or anything that requires the large movement of rows around a distributed dataframe.

Things are coming along well, but I'm running into a performance challenge with pandas and I would like to solicit feedback here before diving more deeply. If this isn't the correct venue please let me know and I'll shift this elsewhere.

We've constructed a script (thanks @gjoseph92 for starting this) that creates a random dataframe and a column on which to split, rearrange, serialize/deserialize, and concat a couple of times. This is representative of the operations that we're trying to do, except that in between a couple of steps there the shards/groups end up coming from different machines, rather than being the same shards

import time
import random
import pickle

import numpy as np
import pandas as pd
    

# Parameters
n_groups = 10_000
n_cols = 1000
n_rows = 30_000
    
# Make input data
df = pd.DataFrame(np.random.random((n_rows, n_cols)))
df["partitions"] = (df[0] * n_groups).astype(int)  # random values 0..10000

start = time.time()
_, groups = zip(*df.groupby("partitions"))  # split into many small shards

groups = list(groups)
random.shuffle(groups)  # rearrange those shards

groups = [pickle.dumps(group) for group in groups]  # Simulate sending across the network
groups = [pickle.loads(group) for group in groups]

df = pd.concat(groups)  # reassemble shards
_, groups = zip(*df.groupby("partitions"))  # and resplit


stop = time.time()

import dask
print(dask.utils.format_bytes(df.memory_usage().sum() / (stop - start)), "/s")

With 10,000 groups I get around 40 MB/s bandwidth.

With 1,000 groups I get around 230 MB/s bandwidth

With 200 or fewer groups I get around 500 MB/s bandwidth

Obviously, one answer here is "use fewer groups, Pandas isn't designed to operate efficiently with only a few rows". That's fair and we're trying to design for that, but there is always pressure to shrink things down so I would like to explore and see if there is anything we can do on the pandas side to add tolerance here.

@mrocklin mrocklin added Enhancement Needs Triage Issue that has not been reviewed by a pandas team member labels Aug 21, 2021
@jbrockmendel
Copy link
Member

Have you profiled this? I'd speculate that the hotspot is in pd.concat

@mrocklin
Copy link
Contributor Author

Yes, so here is the prun output

     9486    1.434    0.000    1.502    0.000 {pandas._libs.lib.maybe_convert_objects}
    18953    0.753    0.000    0.753    0.000 {pandas._libs.lib.array_equivalent_object}
     9477    0.747    0.000    3.386    0.000 {built-in method _pickle.loads}
     9516    0.746    0.000    0.841    0.000 {built-in method _pickle.dumps}
     9477    0.298    0.000    0.622    0.000 concat.py:242(_get_mgr_concatenation_plan)
     9477    0.240    0.000    0.287    0.000 generic.py:2070(__setstate__)
        1    0.187    0.187    0.187    0.187 {method 'random' of 'numpy.random.mtrand.RandomState' objects}
  1017650    0.184    0.000    0.285    0.000 {built-in method builtins.isinstance}
    18954    0.132    0.000    0.178    0.000 {method 'get_slice' of 'pandas._libs.internals.BlockManager' objects}
    19963    0.122    0.000    0.151    0.000 generic.py:5390(__finalize__)
        1    0.111    0.111    8.092    8.092 <string>:1(<module>)
9490/9487    0.097    0.000    1.849    0.000 base.py:382(__new__)
     9479    0.095    0.000    0.246    0.000 managers.py:224(_rebuild_blknos_and_blklocs)
        2    0.084    0.042    0.084    0.042 {pandas._libs.algos.take_2d_axis0_float64_float64}
9507/9503    0.079    0.000    0.080    0.000 {built-in method numpy.core._multiarray_umath.implement_array_function}
   344446    0.072    0.000    0.106    0.000 generic.py:43(_check)
375448/270584    0.070    0.000    0.097    0.000 {built-in method builtins.len}
    56876    0.064    0.000    0.099    0.000 base.py:629(_simple_new)
   481328    0.055    0.000    0.060    0.000 {built-in method builtins.getattr}
    18956    0.051    0.000    1.239    0.000 base.py:4896(equals)
       55    0.051    0.001    0.051    0.001 {built-in method builtins.eval}
    18961    0.046    0.000    0.202    0.000 blocks.py:1920(new_block)
    37916    0.044    0.000    0.120    0.000 base.py:44(shape)
    18971    0.040    0.000    0.040    0.000 {method 'reduce' of 'numpy.ufunc' objects}
      462    0.039    0.000    0.039    0.000 {built-in method marshal.loads}
    18954    0.034    0.000    2.020    0.000 base.py:228(_new_Index)
    66415    0.033    0.000    0.066    0.000 common.py:1577(_is_dtype_type)
    19965    0.031    0.000    0.099    0.000 blocks.py:1845(maybe_coerce_values)
    72/43    0.030    0.000    0.054    0.001 {built-in method _imp.exec_dynamic}
    28456    0.030    0.000    0.030    0.000 {built-in method numpy.empty}
   145368    0.029    0.000    0.045    0.000 {built-in method builtins.issubclass}
    18953    0.028    0.000    1.072    0.000 missing.py:381(array_equivalent)
     9479    0.028    0.000    0.109    0.000 managers.py:906(_verify_integrity)
    18954    0.028    0.000    0.050    0.000 missing.py:674(<genexpr>)
    18956    0.027    0.000    0.103    0.000 base.py:999(take)
   104280    0.027    0.000    0.040    0.000 base.py:843(__len__)
     9481    0.026    0.000    1.536    0.000 base.py:6775(_maybe_cast_data_without_dtype)

Looking at a flame graph with snakeviz I see the following breakdown, with about a 50% reduction at each stage

  1. pickle.loads
  2. concat
  3. pickle.dumps
  4. groupby

@jbrockmendel
Copy link
Member

So the first thing that stands out is that most of the pickle.loads time is in the Index constructor, which should be avoidable by passing the dtype to cls.new within _new_Index (shorter term on your end could be avoided by using an integer dummy instead of the string "partitions")

After the frames are unpickled, the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I'll have to give some thought to how to avoid this.

@mrocklin
Copy link
Contributor Author

(shorter term on your end could be avoided by using an integer dummy instead of the string "partitions")

I'm entirely happy to choose some arbitrary random integer if that would help performance. I am a bit surprised that using a string column name has an effect. I'm a bit curious to know more about what's happening here, but I'm also happy to remain ignorant if that knowledge will become obsolete.

the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I'll have to give some thought to how to avoid this.

Oh cool. I hadn't thought too much about the costs of that check but yes, that makes sense.

@jbrockmendel
Copy link
Member

I'm a bit curious to know more about what's happening here

In unpickling we are calling the Index constructor and in this example an object-dtype ndarray is being passed. The Index constructor tries to do dtype inference, which in this example is pretty much the worst case (all integers and then one string at the end).

@mrocklin
Copy link
Contributor Author

Ah, but if I happened to to have string column names in the other columns then changing partitions to 1387287258 or something wouldn't have a positive effect, correct?

In this particular case, the fact that the column names are integers is coincidental (well, mostly coincidental). I probably shouldn't optimize too heavily around that case. The distribution of column names for us is the same as for pandas in this case, except that we need to add one more for the output partition.

@jbrockmendel
Copy link
Member

Ah, but if I happened to to have string column names in the other columns then changing partitions to 1387287258 or something wouldn't have a positive effect, correct?

Right. Hopefully it will be a moot point in the next release.

@gjoseph92
Copy link

After the frames are unpickled, the concat code spends a bunch of time figuring out that they are already aligned (matching columns and block structure), which in this case we know ex-ante. I'll have to give some thought to how to avoid this.

@jbrockmendel any more ideas on this? The concat seems to be the hotspot for us, usually moreso than pickle.loads.

You can see these profiles recorded by py-spy of our workload on a dask cluster:

(use the "left-heavy" view to see the breakdown of overall runtime, and note that the the time units are wrong because of benfred/py-spy#430)

I could also make a smaller reproducer for just this concat performance. Would it be helpful to split into a separate issue?

@jbrockmendel
Copy link
Member

any more ideas on this?

  1. I've been optimizing this part of the code, so I'm optimistic you'll see some improvement on master
  2. if .blknos remains costly, we store that when pickling instead of re-computing after un-pickling
  3. If your DataFrames happen to be single-dtype, it'll be a lot faster to use np.concatenate on the values and then wrap back in a DataFrame
  4. otherwise implementing this for the general case won't be difficult, but will require accessing pandas internals in a way that id prefer to avoid having downstream packages do (DEPR: restrict downstream usage of core.internals #40226). will need to give some thought to where to implement this

@gjoseph92
Copy link

Sounds great! Looking forward to those improvements. Let us know when you have something we should try out.

@jbrockmendel
Copy link
Member

Below I'm pasting an implementation of what I have in mind. Its about 6x faster than pd.concat on the example in the OP. I'm going to ask you to take the baton to a) test it against a wide variety of DataFrames and b) profile it

import numpy as np
import pandas as pd
import pandas._testing as tm


def concat_known_aligned(frames: list[pd.DataFrame]):
    """
    pd.concat(frames, axis=0) specialized to the case
    where we know that

    a) Columns are identical across frames.
    b) Underlying block layout is identical across frames.

    i.e. these frames are generated by something like

    frames = [df.iloc[i:i+100] for i in range(0, len(df), 100)]

    Notes
    -----
    The caller is responsible for checking these conditions.
    """
    if len(frames) == 0:
       raise ValueError("frames must be non-empty.")

    if frames[0].shape[1] == 0:
       # no columns, can use non-optimized concat cheaply
       return pd.concat(frames, axis=0, ignore_index=True)

    mgrs = [df._mgr for df in frames]
    first = mgrs[0]

    nbs = []
    for i, blk in enumerate(first.blocks):
        arr = blk.values
        arrays = [mgr.blocks[i].values for mgr in mgrs]

        if arr.ndim == 1:
            # i.e. is_1d_only_ea_dtype
            new_arr = arr._concat_same_type(arrays)

        elif not isinstance(arr, np.ndarray):
            new_arr = arr._concat_same_type(arrays, axis=1)

        else:
            new_arr = np.concatenate(arrays, axis=1)

        nb = type(blk)(new_arr, placement=blk.mgr_locs, ndim=2)
        nbs.append(nb)

    index = frames[0].index.append([x.index for x in frames[1:]])
    axes = [frames[0].columns, index]
    new_mgr = type(first)(nbs, axes)
    return pd.DataFrame(new_mgr)


def check_equivalent(frames):
    result = concat_known_aligned(frames)
    expected = pd.concat(frames, axis=0)
    tm.assert_frame_equal(result, expected)


def test():
    df = tm.makeMixedDataFrame()
    frames = [df[i:i+1] for i in range(len(df))]
    check_equivalent(frames)

@gjoseph92
Copy link

I haven't tried this new method out on our workload yet. But I wanted to update you that I think I've found that the biggest bottleneck for us during concatenation is actually the GIL. So though this method should help, I know if it'll solve the problem.

py-spy profile of our concat-heavy phase, recorded with --native to record C callstacks as well (using plain pd.concat):
Screen Shot 2021-09-08 at 5 37 17 PM

You can see that 70% of runtime is spent in pthread_cond_timedwait@@GLIBC_2.3.2, which is indicative of a thread trying to re-acquire the GIL. You might want to look at the time-order view on speedscope; it's interesting to see where this GIL-blocking happens.

Unfortunately plain np.concatenate doesn't parallelize well to begin with. If NumPy is releasing and re-acquiring the GIL multiple times during concatenation, I don't know if we can expect pandas to do any better:

In [1]: import numpy as np
In [2]: %load_ext ptime
In [3]: shards = [np.random.random(500_000) for _ in range(400)]
In [4]: %ptime -n4 np.concatenate(shards)
Total serial time:   3.30 s
Total parallel time: 1.90 s
For a 1.74X speedup across 4 threads

We should have mentioned we were trying to concatenate in all the worker threads (typically 4, but could be any number).

Testing locally on my mac, with DataFrames of around the same size and number as we were using on this cluster, it looks like your new method is a 1.4x speedup over pd.concat single-threaded, and a 1.8x speedup over pd.concat multi-threaded. This is great—getting nearly 2x faster will help a lot! However, concat_known_aligned is still very GIL-bound:

Shard size: 512.00 kiB, num shards to concatenate: 400
Serial 8x - pd.concat, 1.9s, 3.28 GiB/s
4 threads 8x - pd.concat, 1.4s, 4.31 GiB/s, 1.31x parallel speedup (ideal: 4x)
Serial 8x - concat_known_aligned, 1.3s, 4.64 GiB/s, 1.41x speedup over pd.concat serial
4 threads 8x - concat_known_aligned, 0.77s, 8.02 GiB/s, 1.73x parallel speedup (ideal: 4x), 1.86x speedup over pd.concat parallel

Script: https://gist.github.com/gjoseph92/35372dc59de3f3360f7ca377ba321ec4

@jbrockmendel
Copy link
Member

it looks like your new method is a 1.4x speedup over pd.concat

Can you try it with the workload in the OP? I'd like to confirm that you get something closer to the 6x that I got locally.

Unfortunately plain np.concatenate doesn't parallelize well to begin with

Not a whole lot I can do to help with this, but if you find something, I'll be eager to hear about it.

concat_known_aligned is still very GIL-bound

Could potentially parallelize the for loop in concat_known_aligned, but I don't think there's much mileage to be gained there.

It also looks like blknos is pretty significant. I'll be curious if trying it on master helps. If not, there's likely some mileage to be gained by using the fact that it will match across all of the unpickled DataFrames, but again that's getting deep into the internals in a way we'd prefer to avoid.

@gjoseph92
Copy link

With the OP example, it's printing ~45Mib/s with concat_known_aligned instead of ~35MiB/s with pd.concat, so not a 6x improvement overall. However timing just the concat part, that's 1.5s vs 0.1s, so 15x faster! Not sure which timing you were looking at (I assume the latter).

It doesn't look like the np.concatenate C implementation releases the GIL anywhere. FWIW for fun I tried:

In [3]: @numba.jit(nopython=True, nogil=True)
   ...: def concat(arrs):
   ...:     return np.concatenate(arrs)

but the parallel performance was about the same:

In [14]: %ptime -n4 np.concatenate(shards)
Total serial time:   3.19 s
Total parallel time: 1.26 s
For a 2.53X speedup across 4 threads

In [15]: %ptime -n4 concat(shards)
Total serial time:   2.86 s
Total parallel time: 1.24 s
For a 2.30X speedup across 4 threads

Haven't profiled further though to be sure it's actually the GIL in this case.

Maybe we could talk to someone knowledgeable on the NumPy side at some point? I'm sure releasing the GIL in np.concatenate is not a quick change, but maybe there's some other approach we could use.

I'll be curious if trying it on master helps.

I'll try on main both with both pd.concat and your new method.

@jbrockmendel
Copy link
Member

Maybe we could talk to someone knowledgeable on the NumPy side at some point?

Yah, your best bet is either the numpy tracker or numpy-users.

but maybe there's some other approach we could use.

It wouldn't be that difficult to write a cython near-equivalent of that C implementation that could be declared as nogil. Worth a shot.

@mzeitlin11 mzeitlin11 added Compat pandas objects compatability with Numpy or Python functions Performance Memory or execution speed performance and removed Needs Triage Issue that has not been reviewed by a pandas team member labels Sep 11, 2021
@jbrockmendel
Copy link
Member

@gjoseph92 any word on this from the numpy folks?

@gjoseph92
Copy link

@jbrockmendel thanks for checking in—I forgot to reach out to them. I'll be out this week, but maybe @fjetter or @jrbourbeau could open an issue if they have time?

@jakirkham
Copy link
Contributor

When NumPy allocates memory, it registers those allocations with Python's tracemalloc. In Python's C API for tracemalloc, it acquires and releases the GIL to register allocations. As there is only one allocation occurring in np.concatenate and it is deferred until the final array's shape & type are known, there should only be one acquisition/release of the GIL. Likely this is what is being seen here.

If there are multiple calls to np.concatenate, there could be multiple GIL acquisitions/releases, which would be something to avoid (ideally by using as few calls to np.concatenate as possible).

np.concatenate supports an out argument. So one could try preallocating the memory and passing it to concatenate and then profile for GIL usage of concatenate. Guessing nothing else will show up, but would be interesting to see regardless.

As the gist of concatenate is determining the resulting array's metadata, allocating the result array, and copying data to the resulting array, would expect the last step (copying) to be the most time consuming. Features like hugepages would help improve performance (if available). Might be worth checking that is available and enabled on the machine used.

A separate benchmarking for your use case around copying might provide additional insight here.

@jbrockmendel
Copy link
Member

We've made some performance improvements in the alignment-checking recently. I'd be curious to see updated profiling results here.

@mrocklin
Copy link
Contributor Author

Adding @hendrikmakait for context (he's been active in shuffling recently). However, at one point there was also a move to shift to arrow temporarily when we hit the many-small-dataframes case. It may be that that makes us less sensitive to improvements to this particular part of the arrow codebase. Hendrik will know more.

Also as a heads-up, he's coming off of a conference/offsite, and so may be less responsive than typical.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Compat pandas objects compatability with Numpy or Python functions Enhancement Performance Memory or execution speed performance
Projects
None yet
Development

No branches or pull requests

5 participants