Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix collect_set_on_nested_type tests failed #8783

Merged
merged 6 commits into from
Jul 28, 2023

Conversation

thirtiseven
Copy link
Collaborator

Fixes #8716

collect_set function currently does not support NaNs in struct[Array(Double)] or struct[Array(Float)] types, and NaNs are set to not equal to each others by it, see #6079.

But the current IT will generate such data if we have bad luck, leading to test failure.

This PR prevents datagen from generating NaNs when testing `collect_set' on nested types.

@thirtiseven thirtiseven requested a review from ttnghia July 24, 2023 09:03
@thirtiseven thirtiseven self-assigned this Jul 24, 2023
@thirtiseven
Copy link
Collaborator Author

build

@ttnghia
Copy link
Collaborator

ttnghia commented Jul 24, 2023

What will happen if we feed in struct[Array(Double)] or struct[Array(Float)] types? Will it fallback to the CPU or something else?

Copy link
Contributor

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like the proper fix to resolve the test failure. The test is failing because we don't match the CPU for some corner cases. The fix isn't to ignore those corner cases in the test but rather correct the code so those corner cases can now pass the test. If we cannot fix the corner cases in a timely manner, then we need to fallback to the CPU for those corner cases. In this case, it would be falling back to the CPU if the nested types contain floats or doubles anywhere in the type tree.

@thirtiseven
Copy link
Collaborator Author

This doesn't look like the proper fix to resolve the test failure. The test is failing because we don't match the CPU for some corner cases.

Ok, thanks, I didn't notice that the hasNans config in the original PR has been removed, so it really is a bug now.

As this comment said, usually when we use non-nested versions of floats and doubles, NaN values are considered unequal, but when collecting sets of nested array versions, NaN equality is considered on the CPU.

This incompatibility was controlled by spark.rapids.sql.hasNans, but we removed this config later.

What will happen if we feed in struct[Array(Double)] or struct[Array(Float)] types? Will it fallback to the CPU or something else?

So now collect_set will produce different results without fallbacks.

I haven't found out why Spark behaves like this yet, the Spark code of collect_set looks just like add data into a HashSet then convert it to Array. But I guess it is because of some special handling of NaNs in Spark. If it is complicated to follow or needs cuDF support, I will fall back to cpu in this case and file another issue.

@thirtiseven
Copy link
Collaborator Author

Some test results:

>>> from pyspark.sql.functions import collect_set
>>> df1 = spark.createDataFrame([(1.0,), (float("nan"),), (float("nan"),)], ["value"])
>>> df1.agg(collect_set("value")).show(truncate=False)
23/07/25 18:07:49 WARN GpuOverrides:
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> cast(collect_set(value)#4 as string) AS collect_set(value)#7 will run on GPU
    *Expression <Cast> cast(collect_set(value)#4 as string) will run on GPU
  *Exec <ObjectHashAggregateExec> will run on GPU
    *Expression <AggregateExpression> collect_set(value#0, 0, 0) will run on GPU
      *Expression <CollectSet> collect_set(value#0, 0, 0) will run on GPU
    *Expression <Alias> collect_set(value#0, 0, 0)#2 AS collect_set(value)#4 will run on GPU
    *Exec <ShuffleExchangeExec> will run on GPU
      *Partitioning <SinglePartition$> will run on GPU
      *Exec <ObjectHashAggregateExec> will run on GPU. The data type of following expressions will be converted in GPU runtime: buf#10: Converted BinaryType to ArrayType(DoubleType,false)
        *Expression <AggregateExpression> partial_collect_set(value#0, 0, 0) will run on GPU
          *Expression <CollectSet> collect_set(value#0, 0, 0) will run on GPU
        ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
          @Expression <AttributeReference> value#0 could run on GPU

+------------------+
|collect_set(value)|
+------------------+
|[NaN, NaN, 1.0]   |
+------------------+

>>> df2 = spark.createDataFrame([([1.0, float("nan")],), ([1.0, float("nan")],), ([1.0, 2.0],)], ["value"])
>>> df2.agg(collect_set("value")).show(truncate=False)
23/07/25 18:07:57 WARN GpuOverrides:
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> cast(collect_set(value)#28 as string) AS collect_set(value)#31 will run on GPU
    *Expression <Cast> cast(collect_set(value)#28 as string) will run on GPU
  *Exec <ObjectHashAggregateExec> will run on GPU
    *Expression <AggregateExpression> collect_set(value#24, 0, 0) will run on GPU
      *Expression <CollectSet> collect_set(value#24, 0, 0) will run on GPU
    *Expression <Alias> collect_set(value#24, 0, 0)#26 AS collect_set(value)#28 will run on GPU
    *Exec <ShuffleExchangeExec> will run on GPU
      *Partitioning <SinglePartition$> will run on GPU
      *Exec <ObjectHashAggregateExec> will run on GPU. The data type of following expressions will be converted in GPU runtime: buf#34: Converted BinaryType to ArrayType(ArrayType(DoubleType,true),false)
        *Expression <AggregateExpression> partial_collect_set(value#24, 0, 0) will run on GPU
          *Expression <CollectSet> collect_set(value#24, 0, 0) will run on GPU
        ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
          @Expression <AttributeReference> value#24 could run on GPU

+------------------------------------+
|collect_set(value)                  |
+------------------------------------+
|[[1.0, NaN], [1.0, 2.0], [1.0, NaN]]|
+------------------------------------+

>>> spark.conf.set("spark.rapids.sql.enabled", "false")
>>> df1.agg(collect_set("value")).show(truncate=False)
+------------------+
|collect_set(value)|
+------------------+
|[1.0, NaN, NaN]   |
+------------------+

>>> df2.agg(collect_set("value")).show(truncate=False)
+------------------------+
|collect_set(value)      |
+------------------------+
|[[1.0, 2.0], [1.0, NaN]]|
+------------------------+
>>>

@sameerz sameerz added the bug Something isn't working label Jul 25, 2023
@thirtiseven
Copy link
Collaborator Author

Updated code to fallback to cpu if there is double/float in nested type, and filed issue #8808

@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet')
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats, ids=idfn)
def test_hash_groupby_collect_set_fallback_on_nested_floats(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use assert_gpu_fallback_collect instead. It will help to verify that we actually did fall back on what we expect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet')
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats, ids=idfn)
def test_hash_reduction_collect_set_fallback_on_nested_floats(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. if the test is for a fallback we should use the fallback verification API.

('c_struct_array_2', RepeatSeqGen(StructGen([
['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=14)),
('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)),
['c0', struct_array_gen_no_floats], ['c1', int_gen]]), length=14)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we verify that we fallback for window operations too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@thirtiseven
Copy link
Collaborator Author

build

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven
Copy link
Collaborator Author

build

@jlowe jlowe merged commit dc3d67e into NVIDIA:branch-23.08 Jul 28, 2023
thirtiseven added a commit to thirtiseven/spark-rapids that referenced this pull request Aug 2, 2023
thirtiseven added a commit to thirtiseven/spark-rapids that referenced this pull request Aug 2, 2023
jlowe pushed a commit that referenced this pull request Aug 9, 2023
* Match Spark's NaN handling in collect_set

Signed-off-by: Haoyang Li <[email protected]>

* Revert "Fix collect_set_on_nested_type tests failed (#8783)"

* clean up and add comments

* remove xfail

* update cudfmergesets too

* edit comment

* remove related nan datagens

* clean up

* clean up

---------

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven deleted the collect_set_fix branch August 18, 2023 02:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] test_hash_groupby_collect_set_on_nested_type and test_hash_reduction_collect_set_on_nested_type failed
5 participants