-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix collect_set_on_nested_type tests failed #8783
Conversation
Signed-off-by: Haoyang Li <[email protected]>
build |
What will happen if we feed in struct[Array(Double)] or struct[Array(Float)] types? Will it fallback to the CPU or something else? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look like the proper fix to resolve the test failure. The test is failing because we don't match the CPU for some corner cases. The fix isn't to ignore those corner cases in the test but rather correct the code so those corner cases can now pass the test. If we cannot fix the corner cases in a timely manner, then we need to fallback to the CPU for those corner cases. In this case, it would be falling back to the CPU if the nested types contain floats or doubles anywhere in the type tree.
Ok, thanks, I didn't notice that the hasNans config in the original PR has been removed, so it really is a bug now. As this comment said, usually when we use non-nested versions of floats and doubles, NaN values are considered unequal, but when collecting sets of nested array versions, NaN equality is considered on the CPU. This incompatibility was controlled by
So now I haven't found out why Spark behaves like this yet, the Spark code of |
Some test results: >>> from pyspark.sql.functions import collect_set
>>> df1 = spark.createDataFrame([(1.0,), (float("nan"),), (float("nan"),)], ["value"])
>>> df1.agg(collect_set("value")).show(truncate=False)
23/07/25 18:07:49 WARN GpuOverrides:
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(collect_set(value)#4 as string) AS collect_set(value)#7 will run on GPU
*Expression <Cast> cast(collect_set(value)#4 as string) will run on GPU
*Exec <ObjectHashAggregateExec> will run on GPU
*Expression <AggregateExpression> collect_set(value#0, 0, 0) will run on GPU
*Expression <CollectSet> collect_set(value#0, 0, 0) will run on GPU
*Expression <Alias> collect_set(value#0, 0, 0)#2 AS collect_set(value)#4 will run on GPU
*Exec <ShuffleExchangeExec> will run on GPU
*Partitioning <SinglePartition$> will run on GPU
*Exec <ObjectHashAggregateExec> will run on GPU. The data type of following expressions will be converted in GPU runtime: buf#10: Converted BinaryType to ArrayType(DoubleType,false)
*Expression <AggregateExpression> partial_collect_set(value#0, 0, 0) will run on GPU
*Expression <CollectSet> collect_set(value#0, 0, 0) will run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
@Expression <AttributeReference> value#0 could run on GPU
+------------------+
|collect_set(value)|
+------------------+
|[NaN, NaN, 1.0] |
+------------------+
>>> df2 = spark.createDataFrame([([1.0, float("nan")],), ([1.0, float("nan")],), ([1.0, 2.0],)], ["value"])
>>> df2.agg(collect_set("value")).show(truncate=False)
23/07/25 18:07:57 WARN GpuOverrides:
*Exec <ProjectExec> will run on GPU
*Expression <Alias> cast(collect_set(value)#28 as string) AS collect_set(value)#31 will run on GPU
*Expression <Cast> cast(collect_set(value)#28 as string) will run on GPU
*Exec <ObjectHashAggregateExec> will run on GPU
*Expression <AggregateExpression> collect_set(value#24, 0, 0) will run on GPU
*Expression <CollectSet> collect_set(value#24, 0, 0) will run on GPU
*Expression <Alias> collect_set(value#24, 0, 0)#26 AS collect_set(value)#28 will run on GPU
*Exec <ShuffleExchangeExec> will run on GPU
*Partitioning <SinglePartition$> will run on GPU
*Exec <ObjectHashAggregateExec> will run on GPU. The data type of following expressions will be converted in GPU runtime: buf#34: Converted BinaryType to ArrayType(ArrayType(DoubleType,true),false)
*Expression <AggregateExpression> partial_collect_set(value#24, 0, 0) will run on GPU
*Expression <CollectSet> collect_set(value#24, 0, 0) will run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
@Expression <AttributeReference> value#24 could run on GPU
+------------------------------------+
|collect_set(value) |
+------------------------------------+
|[[1.0, NaN], [1.0, 2.0], [1.0, NaN]]|
+------------------------------------+
>>> spark.conf.set("spark.rapids.sql.enabled", "false")
>>> df1.agg(collect_set("value")).show(truncate=False)
+------------------+
|collect_set(value)|
+------------------+
|[1.0, NaN, NaN] |
+------------------+
>>> df2.agg(collect_set("value")).show(truncate=False)
+------------------------+
|collect_set(value) |
+------------------------+
|[[1.0, 2.0], [1.0, NaN]]|
+------------------------+
>>> |
Updated code to fallback to cpu if there is double/float in nested type, and filed issue #8808 |
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet') | ||
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats, ids=idfn) | ||
def test_hash_groupby_collect_set_fallback_on_nested_floats(data_gen): | ||
assert_gpu_and_cpu_are_equal_collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use assert_gpu_fallback_collect
instead. It will help to verify that we actually did fall back on what we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, done.
@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet') | ||
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats, ids=idfn) | ||
def test_hash_reduction_collect_set_fallback_on_nested_floats(data_gen): | ||
assert_gpu_and_cpu_are_equal_collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here. if the test is for a fallback we should use the fallback verification API.
('c_struct_array_2', RepeatSeqGen(StructGen([ | ||
['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=14)), | ||
('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)), | ||
['c0', struct_array_gen_no_floats], ['c1', int_gen]]), length=14)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we verify that we fallback for window operations too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
build |
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
build |
* Match Spark's NaN handling in collect_set Signed-off-by: Haoyang Li <[email protected]> * Revert "Fix collect_set_on_nested_type tests failed (#8783)" * clean up and add comments * remove xfail * update cudfmergesets too * edit comment * remove related nan datagens * clean up * clean up --------- Signed-off-by: Haoyang Li <[email protected]>
Fixes #8716
collect_set
function currently does not support NaNs in struct[Array(Double)] or struct[Array(Float)] types, and NaNs are set to not equal to each others by it, see #6079.But the current IT will generate such data if we have bad luck, leading to test failure.
This PR prevents datagen from generating NaNs when testing `collect_set' on nested types.