diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 8ef7b5d61af..d08295d0e3e 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -16595,9 +16595,9 @@ are limited. S NS NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Arrays containing floats/doubles will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Structs containing float/double will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS @@ -16638,9 +16638,9 @@ are limited. S NS NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Arrays containing floats/doubles will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Structs containing float/double will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS @@ -16681,9 +16681,9 @@ are limited. S NS NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Arrays containing floats/doubles will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
+PS
Structs containing float/double will not be supported.;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, MAP, UDT
NS diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index fd4297cb952..c03c83ffcc0 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -1013,6 +1013,9 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): all_basic_gens_no_nan = [byte_gen, short_gen, int_gen, long_gen, FloatGen(no_nans=True), DoubleGen(no_nans=True), string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] +all_basic_gens_no_floats = [byte_gen, short_gen, int_gen, long_gen, + string_gen, boolean_gen, date_gen, timestamp_gen, null_gen] + # Many Spark versions have issues sorting large decimals, # see https://issues.apache.org/jira/browse/SPARK-40089. orderable_decimal_gen_128bit = decimal_gen_128bit @@ -1045,6 +1048,8 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): single_level_array_gens_no_nan = [ArrayGen(sub_gen) for sub_gen in all_basic_gens_no_nan + decimal_gens] +single_level_array_gens_no_floats = [ArrayGen(sub_gen) for sub_gen in all_basic_gens_no_floats + decimal_gens] + single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens] map_string_string_gen = [MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())] @@ -1063,7 +1068,13 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): all_basic_struct_gen_no_nan = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens_no_nan)]) -struct_array_gen_no_nans = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_nan)]) +all_basic_struct_gen_no_floats = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens_no_floats)]) + +struct_array_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens)]) + +struct_array_gen_no_nans = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_nan)]) + +struct_array_gen_no_floats = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_floats)]) # Some struct gens, but not all because of nesting nonempty_struct_gens_sample = [all_basic_struct_gen, diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index ac96f02d01d..71dc1c319a0 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -603,27 +603,52 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen): ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_list_op] _repeat_agg_column_for_collect_set_op = [ - RepeatSeqGen(all_basic_struct_gen, length=15), + RepeatSeqGen(all_basic_struct_gen_no_floats, length=15), RepeatSeqGen(StructGen([ - ['c0', all_basic_struct_gen], ['c1', int_gen]]), length=15)] + ['c0', all_basic_struct_gen_no_floats], ['c1', int_gen]]), length=15)] + +struct_array_gen_no_nans = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_nan)]) + +struct_array_gen_no_floats = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_floats)]) # data generating for collect_set based-nested Struct[Array] types _repeat_agg_column_for_collect_set_op_nested = [ - RepeatSeqGen(struct_array_gen_no_nans, length=15), + RepeatSeqGen(struct_array_gen_no_floats, length=15), RepeatSeqGen(StructGen([ - ['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=15), - RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)] + ['c0', struct_array_gen_no_floats], ['c1', int_gen]]), length=15), + RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_floats), length=15)] + +struct_array_gen_floats = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen + in enumerate([ArrayGen(double_gen), ArrayGen(float_gen)])]) -_array_of_array_gen = [RepeatSeqGen(ArrayGen(sub_gen), length=15) for sub_gen in single_level_array_gens_no_nan] +struct_gen_floats = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate([float_gen, double_gen])]) + +_repeat_agg_column_for_collect_set_op_nested_floats = [ + RepeatSeqGen(struct_array_gen_floats, length=15), + RepeatSeqGen(StructGen([ + ['c0', struct_array_gen_floats], ['c1', int_gen]]), length=15), + RepeatSeqGen(ArrayGen(struct_gen_floats), length=15), + RepeatSeqGen(ArrayGen(ArrayGen(float_gen)), length=15), + RepeatSeqGen(ArrayGen(ArrayGen(double_gen)), length=15)] + +_array_of_array_gen = [RepeatSeqGen(ArrayGen(sub_gen), length=15) for sub_gen in single_level_array_gens_no_floats] _gen_data_for_collect_set_op = [[ ('a', RepeatSeqGen(LongGen(), length=20)), ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op] +_gen_data_for_collect_set_op_floats = [[ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', RepeatSeqGen(struct_array_gen, length=15))]] + _gen_data_for_collect_set_op_nested = [[ ('a', RepeatSeqGen(LongGen(), length=20)), ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op_nested + _array_of_array_gen] +_gen_data_for_collect_set_op_nested_floats = [[ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op_nested_floats] + _all_basic_gens_with_all_nans_cases = all_basic_gens + [SetValuesGen(t, [math.nan, None]) for t in [FloatType(), DoubleType()]] # very simple test for just a count on decimals 128 values until we can support more with them @@ -688,13 +713,26 @@ def test_hash_groupby_collect_set(data_gen): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8716') def test_hash_groupby_collect_set_on_nested_type(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100) .groupby('a') .agg(f.sort_array(f.collect_set('b')))) +# Fall back to CPU if type is nested and contains floats or doubles in the type tree. +# Because NaNs in nested types are not supported yet. +# See https://github.com/NVIDIA/spark-rapids/issues/8808 +@ignore_order(local=True) +@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet') +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats + + _gen_data_for_collect_set_op_nested_floats, ids=idfn) +def test_hash_groupby_collect_set_fallback_on_nested_floats(data_gen): + assert_gpu_fallback_collect( + lambda spark: gen_df(spark, data_gen, length=100) + .groupby('a') + .agg(f.sort_array(f.collect_set('b'))), + 'CollectSet') + # Note, using sort_array() on the CPU, because sort_array() does not yet # support sorting certain nested/arbitrary types on the GPU @@ -731,12 +769,22 @@ def test_hash_reduction_collect_set(data_gen): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8716') def test_hash_reduction_collect_set_on_nested_type(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100) .agg(f.sort_array(f.collect_set('b')))) +# Fall back to CPU if type is nested and contains floats or doubles in the type tree. +# Because NaNs in nested types are not supported yet. +# See https://github.com/NVIDIA/spark-rapids/issues/8808 +@ignore_order(local=True) +@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec', 'CollectSet') +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_floats + _gen_data_for_collect_set_op_nested_floats, ids=idfn) +def test_hash_reduction_collect_set_fallback_on_nested_floats(data_gen): + assert_gpu_fallback_collect( + lambda spark: gen_df(spark, data_gen, length=100) + .agg(f.sort_array(f.collect_set('b'))), + 'CollectSet') # Note, using sort_array() on the CPU, because sort_array() does not yet # support sorting certain nested/arbitrary types on the GPU @@ -1177,8 +1225,8 @@ def test_collect_list_reductions(data_gen): FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), DoubleGen(special_cases=[]), string_gen, boolean_gen, date_gen, timestamp_gen] -_struct_only_nested_gens = [all_basic_struct_gen, - StructGen([['child0', byte_gen], ['child1', all_basic_struct_gen]]), +_struct_only_nested_gens = [all_basic_struct_gen_no_floats, + StructGen([['child0', byte_gen], ['child1', all_basic_struct_gen_no_floats]]), StructGen([])] @pytest.mark.parametrize('data_gen', _no_neg_zero_all_basic_gens + decimal_gens + _struct_only_nested_gens, diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index f83fb0e725f..9313cada24d 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -1120,10 +1120,10 @@ def do_it(spark): ('a', RepeatSeqGen(LongGen(), length=20)), ('b', UniqueLongGen()), ('c_int', RepeatSeqGen(IntegerGen(), length=15)), - ('c_struct_array_1', RepeatSeqGen(struct_array_gen_no_nans, length=15)), + ('c_struct_array_1', RepeatSeqGen(struct_array_gen_no_floats, length=15)), ('c_struct_array_2', RepeatSeqGen(StructGen([ - ['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=14)), - ('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)), + ['c0', struct_array_gen_no_floats], ['c1', int_gen]]), length=14)), + ('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_floats), length=15)), ('c_array_array_bool', RepeatSeqGen(ArrayGen(ArrayGen(BooleanGen())), length=15)), ('c_array_array_int', RepeatSeqGen(ArrayGen(ArrayGen(IntegerGen())), length=15)), ('c_array_array_long', RepeatSeqGen(ArrayGen(ArrayGen(LongGen())), length=15)), @@ -1132,13 +1132,19 @@ def do_it(spark): ('c_array_array_timestamp', RepeatSeqGen(ArrayGen(ArrayGen(TimestampGen())), length=15)), ('c_array_array_byte', RepeatSeqGen(ArrayGen(ArrayGen(ByteGen())), length=15)), ('c_array_array_string', RepeatSeqGen(ArrayGen(ArrayGen(StringGen())), length=15)), - ('c_array_array_float', RepeatSeqGen(ArrayGen(ArrayGen(FloatGen(no_nans=True))), length=15)), - ('c_array_array_double', RepeatSeqGen(ArrayGen(ArrayGen(DoubleGen(no_nans=True))), length=15)), ('c_array_array_decimal_32', RepeatSeqGen(ArrayGen(ArrayGen(DecimalGen(precision=8, scale=3))), length=15)), ('c_array_array_decimal_64', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_64bit)), length=15)), ('c_array_array_decimal_128', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_128bit)), length=15)), ] +_gen_data_for_collect_set_nest_floats_fallback = [ + ('a', RepeatSeqGen(LongGen(), length=20)), + ('b', UniqueLongGen()), + ('c_int', RepeatSeqGen(IntegerGen(), length=15)), + ('c_array_array_float', RepeatSeqGen(ArrayGen(ArrayGen(FloatGen(no_nans=True))), length=15)), + ('c_array_array_double', RepeatSeqGen(ArrayGen(ArrayGen(DoubleGen(no_nans=True))), length=15)), +] + # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) def test_window_aggs_for_rows_collect_set(): @@ -1235,10 +1241,6 @@ def do_it(spark): (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_byte, collect_set(c_array_array_string) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_str, - collect_set(c_array_array_float) over - (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_float, - collect_set(c_array_array_double) over - (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_double, collect_set(c_array_array_decimal_32) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_32, collect_set(c_array_array_decimal_64) over @@ -1264,8 +1266,6 @@ def do_it(spark): sort_array(cc_array_array_ts), sort_array(cc_array_array_byte), sort_array(cc_array_array_str), - sort_array(cc_array_array_float), - sort_array(cc_array_array_double), sort_array(cc_array_array_decimal_32), sort_array(cc_array_array_decimal_64), sort_array(cc_array_array_decimal_128) @@ -1273,6 +1273,30 @@ def do_it(spark): """) assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) +# Fall back to CPU if type is nested and contains floats or doubles in the type tree. +# Because NaNs in nested types are not supported yet. +# See https://github.com/NVIDIA/spark-rapids/issues/8808 +@ignore_order(local=True) +@allow_non_gpu("ProjectExec", "CollectSet", "WindowExec") +def test_window_aggs_for_rows_collect_set_nested_array(): + conf = copy_and_update(_float_conf, { + "spark.rapids.sql.castFloatToString.enabled": "true", + "spark.rapids.sql.expression.SortArray": "false" + }) + + def do_it(spark): + df = gen_df(spark, _gen_data_for_collect_set_nest_floats_fallback, length=512) + df.createOrReplaceTempView("window_collect_table") + return spark.sql( + """select a, b, + collect_set(c_array_array_float) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_float, + collect_set(c_array_array_double) over + (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_double + from window_collect_table + """) + + assert_gpu_fallback_collect(do_it, 'CollectSet', conf=conf) # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index eaf0d9f2b3f..db0b363452e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3259,23 +3259,20 @@ object GpuOverrides extends Logging { Seq(ParamCheck("input", (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 + TypeSig.NULL + - TypeSig.STRUCT + - TypeSig.ARRAY).nested(), + TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Structs containing " + + s"float/double will not be supported.") + + TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Arrays containing " + + s"floats/doubles will not be supported.")).nested(), TypeSig.all))), (c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) { - private def isNestedArrayType(dt: DataType): Boolean = { - dt match { - case StructType(fields) => - fields.exists { field => - field.dataType match { - case sdt: StructType => isNestedArrayType(sdt) - case _: ArrayType => true - case _ => false - } - } - case ArrayType(et, _) => et.isInstanceOf[ArrayType] || et.isInstanceOf[StructType] - case _ => false + override def tagAggForGpu(): Unit = { + // Fall back to CPU if type is nested and contains floats or doubles in the type tree. + // Because NaNs in nested types are not supported yet. + // See https://github.com/NVIDIA/spark-rapids/issues/8808 + if (c.child.dataType != FloatType && c.child.dataType != DoubleType && + isOrContainsFloatingPoint(c.child.dataType)) { + willNotWorkOnGpu("Float/Double in nested type is not supported") } }