Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Python] Pyarrow.Table.join() breaks on large tables v.18.0.0.dev486 #44513

Closed
kolfild26 opened this issue Oct 23, 2024 · 25 comments
Closed
Assignees
Labels
Component: C++ Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Priority: Critical Type: bug
Milestone

Comments

@kolfild26
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

Hi,
In my task I need to join two tables. One of 18m rows and another of 487m rows.
t_18m.join(t_487m, keys=[''col1, 'col2', 'col3'], join_type="left outer")
I was using the most actual pyarrow version which is 17 at the moment. While performing a join it breaks with Segmentation fault (core dumped)
I tried to investigate and found the most recent version w/o such behaviour is v13. But I instead of Segmentation fault it:
either silently produces wrong result
or breaks with ! Invalid: There are more than 2^32 bytes of key data. Acero cannot process a join of this magnitude
Next I searched on the github issues and found there are many similar user cases around. That's why I didn't include to many details in this report. You probably know these issues well.

There was #43495 the enchancement request, which as far as I understand has been included to the v18. I installed the v.18.0.0.dev486 package in order to test, but unfortunately It still throws the segmentation fault error on my case.

So if the enchancement is already merged into v.18.00 it still does not fix the problem.

Component(s)

Python

@kolfild26
Copy link
Author

I also tryied to reduce the size of the right table and the working limit actually varies for me. Not able to find the exact number. I'm getting either the seg fault or the join result is incorrect.

My system has 4Tb memory in total, so it's not connected to the out-of-memory issue.
Here is the other system specs:

Oracle Linux Server 7.8

ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 16511255
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 4096
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) unlimited
cpu time (seconds, -t) unlimited
max user processes (-u) 4096
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited

Python 3.10.15

import pyarrow as pa
pa.version
'18.0.0.dev486'

@zanmato1984
Copy link
Contributor

Hi @kolfild26 , thanks for reporting this.

There are lots of solved issues from v13 to v18 that may cause silent wrong answer or segfault in hash join, and possibly more unrevealed ones as well. So it is not too surprising that different versions behave differently.

Could you please provide us the complete schemas and the estimated sizes of both tables? And better yet, could you give a more-or-less working limit of your case? These are essential informations to investigate this issue.

Also, there might be a workaround that worth a try, change t_18m.join(t_487m, keys=[''col1, 'col2', 'col3'], join_type="left outer") to t_487m.join(t_18m, keys=[''col1, 'col2', 'col3'], join_type="right outer"). (I assume t_18m is much smaller than t_487m and this will make our hash join to use the small table to build the hash table.)

Thanks.

@kolfild26
Copy link
Author

@zanmato1984
Sorry for such delay
Yeah, switch to RIGHT JOIN fixes the issue. I wish I'd done it from the very beginning.
So, a working case here is like:
large_table.join(small_table, join_type='right outer'). No segfault and the result is correct.
But small_table.join(large_table, join_type='left outer') still leads to segfault on v.18.1.0

small.shape
(18201475, 9)
large.shape
(360449051, 4)

small.schema
ID_DEV_STYLECOLOR: int64
ID_DEV_STYLECOLOR_SIZE: int64
ID_COLLECTION: int64
ID_PARTITION_DIV_TMA: int64
ID_END_QUOTING_DAY: int64
ID_DEPARTMENT: int64
ID_BEGIN_QUOTING_DAY: int64
INTAKE_DATE: timestamp[us]
UPA_MIN: int64

large.schema
ID_DEV_STYLECOLOR_SIZE: int64
ID_DEPARTMENT: int64
ID_COLLECTION: int64
PL_VALUE: int64

large.join(small, keys=['ID_DEV_STYLECOLOR_SIZE', 'ID_DEPARTMENT', 'ID_COLLECTION'], join_type='right outer')

small.join(large[0:200000000], keys=['ID_DEV_STYLECOLOR_SIZE', 'ID_DEPARTMENT', 'ID_COLLECTION'], join_type='left outer') ✅ (no segfault at least)

small.join(large, keys=['ID_DEV_STYLECOLOR_SIZE', 'ID_DEPARTMENT', 'ID_COLLECTION'], join_type='left outer')

Segfault appears once the large table size reaches ~250m in the left join case

@zanmato1984
Copy link
Contributor

Hi @kolfild26 , thank you for the feedback and further information. I'll try to reproduce the issue. However it will be helpful if you can supply the following information as well:

  1. Any stacktrace of the segfault;
  2. The join cardinality, or equally, the number of rows of the (left/right) join result.

@zanmato1984 zanmato1984 changed the title Pyarrow.Table.join() breaks on large tables v.18.0.0.dev486 [C++][Python] Pyarrow.Table.join() breaks on large tables v.18.0.0.dev486 Dec 13, 2024
@zanmato1984
Copy link
Contributor

Hi @kolfild26 , thank you for the feedback and further information. I'll try to reproduce the issue. However it will be helpful if you can supply the following information as well:

  1. Any stacktrace of the segfault;
  2. The join cardinality, or equally, the number of rows of the (left/right) join result.

My first attempt to reproduce the issue using non-null arbitrarily random distributed columns at the same schema and scale, failed (that is, my test passed w/o segfault). So I also need information about the distributions of each key column: null probability, min/max, any high cardinality value. Thank you.

@kolfild26
Copy link
Author

@zanmato1984
Stacktrace:

Dec 16 01:07:44 kernel: python[37938]: segfault at 7f3004626050 ip 00007f3fc25441cd sp 00007f3f10b09018 error 4 in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: python[37971]: segfault at 7f3004626050 ip 00007f3fc25441db sp 00007f3f002b0018 error 4
Dec 16 01:07:44 kernel: python[37961]: segfault at 7f3004626050 ip 00007f3fc25441cd sp 00007f3f052d0018 error 4 in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: python[37957]: segfault at 7f3004626050 ip 00007f3fc25441db sp 00007f3f072d8018 error 4
Dec 16 01:07:44 kernel: python[37940]: segfault at 7f3004626050 ip 00007f3fc25441cd sp 00007f3f0fb07018 error 4
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: python[37974]: segfault at 7f3004626050 ip 00007f3fc25441cd sp 00007f3d18f6d018 error 4 in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: python[37966]: segfault at 7f3004626050 ip 00007f3fc25441db sp 00007f3f02abf018 error 4
Dec 16 01:07:44 kernel: python[37951]: segfault at 7f3004626050 ip 00007f3fc25441db sp 00007f3f0a2ec018 error 4
Dec 16 01:07:44 kernel: python[37973]: segfault at 7f3004626050 ip 00007f3fc25441cd sp 00007f3efb7fe018 error 4
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: python[37953]: segfault at 7f3004626050 ip 00007f3fc25441db sp 00007f3f092e6018 error 4
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 kernel: in libarrow.so.1801[7f3fc1670000+2269000]
Dec 16 01:07:44 abrt-hook-ccpp: Process 35963 (python3.10) of user 1000 killed by SIGSEGV - dumping core

Here is the tables's statistics:

Script to get stats
import pyarrow as pa
import pyarrow.compute as pc
import pandas as pd
import pyarrow.types as patypes

def get_column_distributions(table):
    distributions = {}
    total_rows = table.num_rows

    for column in table.schema.names:
        col_data = table[column]
        null_count = pc.sum(pc.is_null(col_data)).as_py()
        null_percentage = (null_count / total_rows) * 100 if total_rows > 0 else 0
        
        # Compute the cardinality (unique count / total count)
        unique_count = pc.count_distinct(col_data.filter(pc.is_valid(col_data))).as_py()
        cardinality_percentage = round((unique_count / total_rows)*100,3) if total_rows > 0 else 0
        
        if patypes.is_integer(col_data.type) or patypes.is_floating(col_data.type):
            stats = {
                "count": pc.count(col_data).as_py(),
                "nulls": null_count,
                "null_percentage": null_percentage,
                "cardinality_percentage": cardinality_percentage,
                "min": pc.min(col_data).as_py(),
                "max": pc.max(col_data).as_py(),
            }
        elif patypes.is_string(col_data.type) or patypes.is_binary(col_data.type):
            value_counts = pc.value_counts(col_data.filter(pc.is_valid(col_data)))
            stats = {
                "nulls": null_count,
                "null_percentage": null_percentage,
                "cardinality_percentage": cardinality_percentage,
                "value_counts": value_counts.to_pandas().to_dict("records"),
            }
        else:
            stats = {
                "nulls": null_count,
                "null_percentage": null_percentage,
                "cardinality_percentage": cardinality_percentage,
                "message": f"Statistics not supported for type: {col_data.type}"
            }

        distributions[column] = stats

    return distributions
small

small

large

large

Would it be easier if I attached the tables here?

@zanmato1984
Copy link
Contributor

Would it be easier if I attached the tables here?

@kolfild26 yeah please, that's even more useful.

@zanmato1984
Copy link
Contributor

  1. The join cardinality, or equally, the number of rows of the (left/right) join result.

And also, do you have this one?

@kolfild26
Copy link
Author

Cardinality can refer to different things.
In a database context, cardinality usually refers to the number of unique values in a relational table column relative to the total number of rows in the table.
So, if are both talking about the same, cardinality is presented in the report above, cardinality_percentage = (unique_count / total_rows)*100

As it was mentioned before, I'd better provide the sources. Didn't find a better way than a google drive.

https://drive.google.com/file/d/1nRWTnanI3gWuumVfZHl4nvjUXwwq5qDy
https://drive.google.com/file/d/1sbfa-i3OL5_Wr-qmBtbKzWrpfUMGvWpV

Files in the zip archives are pickled, so to restore them:

with open('large.pkl', 'rb') as f:
    large_table = pickle.load(f)

@zanmato1984
Copy link
Contributor

Cardinality can refer to different things. In a database context, cardinality usually refers to the number of unique values in a relational table column relative to the total number of rows in the table. So, if are both talking about the same, cardinality is presented in the report above, cardinality_percentage = (unique_count / total_rows)*100

But "cardinality" can also represent the size of the join result which is what I originally asked about. Do you have that? (You can just run the right join and count the number of rows).

And thank you for the source files. I'll try to reproduce the issue using these files in my local.

@zanmato1984
Copy link
Contributor

Hi @kolfild26 , I've successfully run the case in my local (M1 MBP with 32GB memory, arrow 18.1.0) but didn't reproduce the issue.

My python script:

import pandas
import pickle
import pyarrow

def main():
    print("pandas: {0}, pyarrow: {1}".format(pandas.__version__, pyarrow.__version__))
    with open('small.pkl', 'rb') as f: small = pickle.load(f)
    with open('large.pkl', 'rb') as f: large = pickle.load(f)
    print("small size: {0}, large size: {1}".format(small.num_rows, large.num_rows))
    join = small.join(large, keys=['ID_DEV_STYLECOLOR_SIZE', 'ID_DEPARTMENT', 'ID_COLLECTION'], join_type='left outer')
    print("join size: {0}".format(join.num_rows))

if __name__ == "__main__":
    main()

Result:

python test.py
pandas: 2.2.3, pyarrow: 18.1.0
small size: 18201475, large size: 360449051
join size: 18201475

Did I miss something?

@kolfild26
Copy link
Author

The resulted join size looks correct.
Could you please check:

  1. apply filter ID_DEV_STYLECOLOR_SIZE = 88506230299 and ID_DEPARTMENT = 16556030299. It should return 2 in PL_VALUE column.
  2. Apply sum(PL_VALUE) and it should return 58360744

That's just to eliminate 'false positive'. I mentioned that I tested on different versions and it sometimes caused a silent wrong answer even though there were no seg.fault.

If all above is correct, might the segfault error be caused by any system/os settings?

my setup
    Oracle Linux Server 7.8

    ulimit -a
    core file size (blocks, -c) 0
    data seg size (kbytes, -d) unlimited
    scheduling priority (-e) 0
    file size (blocks, -f) unlimited
    pending signals (-i) 16511255
    max locked memory (kbytes, -l) unlimited
    max memory size (kbytes, -m) unlimited
    open files (-n) 4096
    pipe size (512 bytes, -p) 8
    POSIX message queues (bytes, -q) 819200
    real-time priority (-r) 0
    stack size (kbytes, -s) unlimited
    cpu time (seconds, -t) unlimited
    max user processes (-u) 4096
    virtual memory (kbytes, -v) unlimited
    file locks (-x) unlimited

    Python 3.10.15

    import pyarrow as pa
    pa.version
    '18.1.0'

@zanmato1984
Copy link
Contributor

  1. apply filter ID_DEV_STYLECOLOR_SIZE = 88506230299 and ID_DEPARTMENT = 16556030299. It should return 2 in PL_VALUE column.

Correct:

>>> cond = pc.and_(pc.equal(large['ID_DEV_STYLECOLOR_SIZE'], 88506230299), pc.equal(large['ID_DEPARTMENT'], 16556030299))
>>> filtered = large.filter(cond)
>>> print(filtered)
pyarrow.Table
ID_DEV_STYLECOLOR_SIZE: int64
ID_DEPARTMENT: int64
ID_COLLECTION: int64
PL_VALUE: int64
----
ID_DEV_STYLECOLOR_SIZE: [[88506230299]]
ID_DEPARTMENT: [[16556030299]]
ID_COLLECTION: [[11240299]]
PL_VALUE: [[2]]
>
  1. Apply sum(PL_VALUE) and it should return 58360744

No:

>>> sum = pc.sum(large['PL_VALUE'])
>>> print(sum)
461379027

That's just to eliminate 'false positive'. I mentioned that I tested on different versions and it sometimes caused a silent wrong answer even though there were no seg.fault.

Hmm, I think we should only focus on v18.1.0. As I mentioned, there are a lot of fixes ever since, so the behavior in prior versions will vary for sure, and I think most of the issues (if not all) are already addressed.

If all above is correct, might the segfault error be caused by any system/os settings?

I also verified on my Intel MBP (I just realized that we have x86-specialized SIMD code path for hash join so I wanted to see if the issue was there), but still unable to reproduce. And your setup doesn't seem to have any particular thing to do with this issue.

To proceed with the debugging:

  1. Did you run my python script on your env to see if it runs into segfault? (And in case it doesn't, would you kindly help to fix it to make the segfault happen?) I think this is quite essential, because we need to agree on a minimal reproducible case (at least on either env of us). Then I can ask some other people to help verifying on broader environments.
  2. Would you help to confirm the difference of sum(PL_VALUE) in my run (461379027) against yours (58360744)?
  3. What is your CPU model?
  4. In your original run of segfault (again, on v18.1.0), is it always reproducible or by chance?

Debugging this kind of issue is tricky and takes time and communication. I really appreciate your patience @kolfild26 , thank you!

@kolfild26
Copy link
Author

2️⃣ I meant filter() and sum() to be applied to the resulted table, i.e. join while you have applied to large.
3️⃣ Intel(R) Xeon(R) Gold 6246 CPU @ 3.30GHz. 4 sockets * 12 cores = 48 logical cpus
1️⃣ 4️⃣ Yes, segfault occures always, having the fixed size of the input tables. All recent tests I refer to, are on v18.1.0

@zanmato1984
Copy link
Contributor

zanmato1984 commented Dec 20, 2024

I can now confirm that the problem does exist.

By applying filter and sum on the join result, I found my previous non-segfault runs were false positive:

    join = small.join(large, keys=['ID_DEV_STYLECOLOR_SIZE', 'ID_DEPARTMENT', 'ID_COLLECTION'], join_type='left outer')
    print("join size: {0}".format(join.num_rows))
    cond = pc.and_(pc.equal(join['ID_DEV_STYLECOLOR_SIZE'], 88506230299), pc.equal(join['ID_DEPARTMENT'], 16556030299))
    filtered = join.filter(cond)
    print("filtered")
    print(filtered)
    sum = pc.sum(join['PL_VALUE'])
    print("sum")
    print(sum)

Result:

filtered: PL_VALUE: [[null]]
...
sum: 33609597 # Another run emits 33609997

A side note: If I use right join (and switch the small/large sides), the filter/sum results are as expected.

And I also happen to have access to a x86 Ubuntu desktop, on which I reproduced the segfault.

I'm now digging into it.

Also, considering the silent wrong answer on some platforms, I'm marking this issue critical.

Thanks alot @kolfild26 for helping me to reproduce the issue!

zanmato1984 added a commit that referenced this issue Jan 13, 2025
#45108)

### Rationale for this change

#44513 triggers two distinct overflow issues within swiss join, both happening when the build side table contains large enough number of rows or distinct keys. (Cases at this extent of hash join build side are rather rare, so we haven't seen them reported until now):

1. The first issue is, our swiss table implementation takes the higher `N` bits of 32-bit hash value as the index to a buffer storing "block"s (a block contains `8` key - in some code also referred to as "group" - ids). This `N`-bit number is further multiplied by the size of a block, which is also related to `N`. The `N` in the case of #44513 is `26` and a block takes `40` bytes. So the multiply is possible to produce a number over `1 << 31` (negative when interpreted as signed 32bit). In our AVX2 specialization of accessing the block buffer https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal_avx2.cc#L404 , the issue like #41813 (comment) shows up. This is the actual issue that directly produced the segfault in #44513.
2. The other issue is, we take `7` bits of the 32-bit hash value after `N` as a "stamp" (to quick fail the hash comparison). But when `N` is greater than `25`, some arithmetic code like https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal.cc#L397 (`bits_hash_` is `constexpr 32`, `log_blocks_` is `N`, `bits_stamp_` is `constexpr 7`, this is to retrieve the stamp from a hash) produces `hash >> -1` aka `hash >> 0xFFFFFFFF` aka `hash >> 31` (the heading `1`s are trimmed) then the stamp value is wrong and results in false-mismatched rows. This is the reason of my false positive run in #44513 (comment) .

### What changes are included in this PR?

For issue 1, use 64-bit index gather intrinsic to avoid the offset overflow.

For issue 2, do not right-shift the hash if `N + 7 >= 32`. This is actually allowing the bits overlapping between block id (the `N` bits) and stamp (the `7` bits). Though this may introduce more false-positive hash comparisons (thus worsen the performance), I think this is still more reasonable than brutally failing for `N > 25`. I introduce two members `bits_shift_for_block_and_stamp_` and `bits_shift_for_block_`, which are derived from `log_blocks_` - esp. set to `0` and `32 - N` when `N + 7 >= 32`, this is to avoid branching like `if (log_blocks_ + bits_stamp_ > bits_hash_)` in tight loops.

### Are these changes tested?

The fix is manually tested with the original case in my local. (I do have a concrete C++ UT to verify the fix but it requires too much resource and runs for too long time so it is impractical to run in any reasonable CI environment.)

### Are there any user-facing changes?

None.

* GitHub Issue: #44513

Lead-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
@zanmato1984 zanmato1984 added this to the 20.0.0 milestone Jan 13, 2025
@zanmato1984
Copy link
Contributor

Issue resolved by pull request 45108
#45108

@ivakru
Copy link

ivakru commented Jan 17, 2025

Was facing a similar issue with small.join(big, join_type="left outer"), which was resolved with big.join(small, join_type="right outer"). Why was this the first suggestion to try to address the issue here? I would have assumed that these produce the same results.
Thanks :)

@zanmato1984
Copy link
Contributor

zanmato1984 commented Jan 17, 2025

Was facing a similar issue with small.join(big, join_type="left outer"), which was resolved with big.join(small, join_type="right outer"). Why was this the first suggestion to try to address the issue here? I would have assumed that these produce the same results. Thanks :)

Hi @ivakru , this was merely a workaround to avoid some bug that only appears in the case of the build side table being too big. And it shouldn't be considered "addressing the issue".

I've came up with a fix #45108 of the underlying bug.

Thank you.

@ivakru
Copy link

ivakru commented Jan 17, 2025

Thank you so much @zanmato1984!
I was trying to understand the "build-side" part that you are referring to, since my limited understanding would have me believe that two joins were equivalent.
I did not mean to bring back a dead issue. Will read into this separately.
Many thanks! <3

@zanmato1984
Copy link
Contributor

zanmato1984 commented Jan 17, 2025

Ah, no worry, non-taken :)

I can elaborate a bit about the implementation details. The join operation is implemented by "hash join" algorithm, that is, 1) "build" a hash table using the table from one side of the join (the "build side"), 2) lookup the hash table for the table from the other side (the "probe side"). We always choose the right side as the build side, that is, big in small.join(big, "left outer"), or small in big.join(small, "right outer"). Using a big table to build the hash table is inefficient, and our implementation is error-prone, so switching the sides (and the join type) will workaround the issue. And of course, the result is identical to the original join - otherwise it wouldn't be a valid workaround :)

@ivakru
Copy link

ivakru commented Jan 17, 2025

Thank you so much for the explanation!

This makes sense, and I am already seeing memory and runtime savings when dealing with large tables.

Kind regards.

@kolfild26
Copy link
Author

@zanmato1984
Is this PR included in v.19.0.0 and could be tested on that?

@zanmato1984
Copy link
Contributor

No, it's in 20.0.0 which will come in the next three month. You can test it on a nightly build though.

@zanmato1984
Copy link
Contributor

Hi @amoeba , shall we add this issue into 19.0.1 release? Thanks.

@amoeba amoeba added the Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. label Jan 24, 2025
@amoeba amoeba modified the milestones: 20.0.0, 19.0.1 Jan 24, 2025
@amoeba
Copy link
Member

amoeba commented Jan 24, 2025

Just added it, thanks. I also added the Critical Fix to the issue here to match the PR (we prefer to label issues). Thanks for bringing it up.

amoeba pushed a commit that referenced this issue Jan 31, 2025
#45108)

### Rationale for this change

#44513 triggers two distinct overflow issues within swiss join, both happening when the build side table contains large enough number of rows or distinct keys. (Cases at this extent of hash join build side are rather rare, so we haven't seen them reported until now):

1. The first issue is, our swiss table implementation takes the higher `N` bits of 32-bit hash value as the index to a buffer storing "block"s (a block contains `8` key - in some code also referred to as "group" - ids). This `N`-bit number is further multiplied by the size of a block, which is also related to `N`. The `N` in the case of #44513 is `26` and a block takes `40` bytes. So the multiply is possible to produce a number over `1 << 31` (negative when interpreted as signed 32bit). In our AVX2 specialization of accessing the block buffer https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal_avx2.cc#L404 , the issue like #41813 (comment) shows up. This is the actual issue that directly produced the segfault in #44513.
2. The other issue is, we take `7` bits of the 32-bit hash value after `N` as a "stamp" (to quick fail the hash comparison). But when `N` is greater than `25`, some arithmetic code like https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal.cc#L397 (`bits_hash_` is `constexpr 32`, `log_blocks_` is `N`, `bits_stamp_` is `constexpr 7`, this is to retrieve the stamp from a hash) produces `hash >> -1` aka `hash >> 0xFFFFFFFF` aka `hash >> 31` (the heading `1`s are trimmed) then the stamp value is wrong and results in false-mismatched rows. This is the reason of my false positive run in #44513 (comment) .

### What changes are included in this PR?

For issue 1, use 64-bit index gather intrinsic to avoid the offset overflow.

For issue 2, do not right-shift the hash if `N + 7 >= 32`. This is actually allowing the bits overlapping between block id (the `N` bits) and stamp (the `7` bits). Though this may introduce more false-positive hash comparisons (thus worsen the performance), I think this is still more reasonable than brutally failing for `N > 25`. I introduce two members `bits_shift_for_block_and_stamp_` and `bits_shift_for_block_`, which are derived from `log_blocks_` - esp. set to `0` and `32 - N` when `N + 7 >= 32`, this is to avoid branching like `if (log_blocks_ + bits_stamp_ > bits_hash_)` in tight loops.

### Are these changes tested?

The fix is manually tested with the original case in my local. (I do have a concrete C++ UT to verify the fix but it requires too much resource and runs for too long time so it is impractical to run in any reasonable CI environment.)

### Are there any user-facing changes?

None.

* GitHub Issue: #44513

Lead-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
amoeba pushed a commit that referenced this issue Jan 31, 2025
#45108)

### Rationale for this change

#44513 triggers two distinct overflow issues within swiss join, both happening when the build side table contains large enough number of rows or distinct keys. (Cases at this extent of hash join build side are rather rare, so we haven't seen them reported until now):

1. The first issue is, our swiss table implementation takes the higher `N` bits of 32-bit hash value as the index to a buffer storing "block"s (a block contains `8` key - in some code also referred to as "group" - ids). This `N`-bit number is further multiplied by the size of a block, which is also related to `N`. The `N` in the case of #44513 is `26` and a block takes `40` bytes. So the multiply is possible to produce a number over `1 << 31` (negative when interpreted as signed 32bit). In our AVX2 specialization of accessing the block buffer https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal_avx2.cc#L404 , the issue like #41813 (comment) shows up. This is the actual issue that directly produced the segfault in #44513.
2. The other issue is, we take `7` bits of the 32-bit hash value after `N` as a "stamp" (to quick fail the hash comparison). But when `N` is greater than `25`, some arithmetic code like https://github.com/apache/arrow/blob/0a00e25f2f6fb927fb555b69038d0be9b9d9f265/cpp/src/arrow/compute/key_map_internal.cc#L397 (`bits_hash_` is `constexpr 32`, `log_blocks_` is `N`, `bits_stamp_` is `constexpr 7`, this is to retrieve the stamp from a hash) produces `hash >> -1` aka `hash >> 0xFFFFFFFF` aka `hash >> 31` (the heading `1`s are trimmed) then the stamp value is wrong and results in false-mismatched rows. This is the reason of my false positive run in #44513 (comment) .

### What changes are included in this PR?

For issue 1, use 64-bit index gather intrinsic to avoid the offset overflow.

For issue 2, do not right-shift the hash if `N + 7 >= 32`. This is actually allowing the bits overlapping between block id (the `N` bits) and stamp (the `7` bits). Though this may introduce more false-positive hash comparisons (thus worsen the performance), I think this is still more reasonable than brutally failing for `N > 25`. I introduce two members `bits_shift_for_block_and_stamp_` and `bits_shift_for_block_`, which are derived from `log_blocks_` - esp. set to `0` and `32 - N` when `N + 7 >= 32`, this is to avoid branching like `if (log_blocks_ + bits_stamp_ > bits_hash_)` in tight loops.

### Are these changes tested?

The fix is manually tested with the original case in my local. (I do have a concrete C++ UT to verify the fix but it requires too much resource and runs for too long time so it is impractical to run in any reasonable CI environment.)

### Are there any user-facing changes?

None.

* GitHub Issue: #44513

Lead-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: C++ Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Priority: Critical Type: bug
Projects
None yet
Development

No branches or pull requests

4 participants