Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for nested types to collect_set(...) on the GPU [databricks] #6079

Merged
merged 17 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
74883ee
WIP: enable nested types with collect_set
NVnavkumar Jul 14, 2022
a0fac74
tests for collect_set() with nested array types
NVnavkumar Jul 15, 2022
2aa4d1f
preparing to switch to local sorting
NVnavkumar Jul 22, 2022
86ca609
Run sort_array on the CPU in order to test collect_set() with Struct[…
NVnavkumar Jul 23, 2022
4ced710
Test cleanup and right now just ensure that Struct[Array] works with …
NVnavkumar Jul 25, 2022
0abc00a
Updated docs
NVnavkumar Jul 25, 2022
f764d30
For integration tests, add array of struct case and array of array case
NVnavkumar Jul 25, 2022
54d08a8
Fix window aggregation configuration, and add integration tests for c…
NVnavkumar Jul 28, 2022
f984ce4
Add reduction integration tests
NVnavkumar Jul 28, 2022
2dfe8ba
Add incompat documentation, and update tests to remove approximate fl…
NVnavkumar Aug 2, 2022
753bfe8
Revert "Add incompat documentation, and update tests to remove approx…
NVnavkumar Aug 2, 2022
bd227fa
Cleanup test code, rename data_gen var, and remove unnecessary decora…
NVnavkumar Aug 2, 2022
4cca48a
Update CollectSet with limited hasNans requirement and update tests
NVnavkumar Aug 2, 2022
0ef4784
Update partial support docs for CollectSet
NVnavkumar Aug 2, 2022
14b6ced
Make this check recursive to handle structs that contain structs of a…
NVnavkumar Aug 3, 2022
7f928c6
Update integration test for collect_set on nested array types to hand…
NVnavkumar Aug 5, 2022
6f1e0da
Update window_function_test to handle optimization performed by Datab…
NVnavkumar Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -15459,9 +15459,9 @@ are limited.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -15480,7 +15480,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -15502,9 +15502,9 @@ are limited.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -15523,7 +15523,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -15545,9 +15545,9 @@ are limited.
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>Support for arrays of arrays of floats/doubles requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>Support for structs containing float/double array columns requires spark.rapids.sql.hasNans to be set to false;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand All @@ -15566,7 +15566,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, ARRAY, MAP, UDT</em></td>
<td><em>PS<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,10 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
# all of the basic types in a single struct
all_basic_struct_gen = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens)])

all_basic_struct_gen_no_nan = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(all_basic_gens_no_nan)])

struct_array_gen_no_nans = StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(single_level_array_gens_no_nan)])

# Some struct gens, but not all because of nesting
nonempty_struct_gens_sample = [all_basic_struct_gen,
StructGen([['child0', byte_gen], ['child1', all_basic_struct_gen]]),
Expand Down
79 changes: 70 additions & 9 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,23 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen):
RepeatSeqGen(StructGen([
['c0', all_basic_struct_gen], ['c1', int_gen]]), length=15)]

# data generating for collect_set based-nested Struct[Array] types
_repeat_agg_column_for_collect_set_op_nested = [
RepeatSeqGen(struct_array_gen_no_nans, length=15),
RepeatSeqGen(StructGen([
['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=15),
RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)]

_array_of_array_gen = [RepeatSeqGen(ArrayGen(sub_gen), length=15) for sub_gen in single_level_array_gens_no_nan]

_gen_data_for_collect_set_op = [[
('a', RepeatSeqGen(LongGen(), length=20)),
('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op]

_gen_data_for_collect_set_op_nested = [[
('a', RepeatSeqGen(LongGen(), length=20)),
('b', value_gen)] for value_gen in _repeat_agg_column_for_collect_set_op_nested + _array_of_array_gen]

# very simple test for just a count on decimals 128 values until we can support more with them
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
Expand Down Expand Up @@ -622,29 +635,80 @@ def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg):
conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower(),
'spark.sql.shuffle.partitions': '1'})

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_set(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_set('b')), f.count('b')))

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn)
def test_hash_groupby_collect_set_on_nested_type(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.sort_array(f.collect_set('b'))))

@approximate_float

# Note, using sort_array() on the CPU, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", "SortArray")
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn)
def test_hash_groupby_collect_set_on_nested_array_type(data_gen):
conf = copy_and_update(_no_nans_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
"spark.rapids.sql.castDecimalToString.enabled": "true",
"spark.rapids.sql.expression.SortArray": "false"
})

assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.groupby('a')
.agg(f.collect_set('b').alias("collect_set"))
.selectExpr("sort_array(collect_set)")
, conf=conf)

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn)
def test_hash_reduction_collect_set(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.sort_array(f.collect_set('b')), f.count('b')))

@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op, ids=idfn)
def test_hash_reduction_collect_set_on_nested_type(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.sort_array(f.collect_set('b'))))


# Note, using sort_array() on the CPU, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", "SortArray")
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_set_op_nested, ids=idfn)
def test_hash_reduction_collect_set_on_nested_array_type(data_gen):
conf = copy_and_update(_no_nans_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
"spark.rapids.sql.castDecimalToString.enabled": "true",
"spark.rapids.sql.expression.SortArray": "false"
})

assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.agg(f.collect_set('b').alias("collect_set"))
.selectExpr("sort_array(collect_set)")
, conf=conf)

@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_with_single_distinct(data_gen):
# test collect_ops with other distinct aggregations
Expand All @@ -656,9 +720,7 @@ def test_hash_groupby_collect_with_single_distinct(data_gen):
f.countDistinct('c'),
f.count('c')))

@approximate_float
@ignore_order(local=True)
@incompat
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_single_distinct_collect(data_gen):
# test distinct collect
Expand All @@ -681,7 +743,6 @@ def test_hash_groupby_single_distinct_collect(data_gen):
df_fun=lambda spark: gen_df(spark, data_gen, length=100),
table_name="tbl", sql=sql)

@approximate_float
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn)
def test_hash_groupby_collect_with_multi_distinct(data_gen):
Expand Down
101 changes: 101 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@
FloatGen(no_nans=True, special_cases=[]), DoubleGen(no_nans=True, special_cases=[]),
string_gen, boolean_gen, date_gen, timestamp_gen, null_gen]

_no_nans_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true',
'spark.rapids.sql.hasNans': 'false',
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

@ignore_order
@pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn)
def test_decimal128_count_window(data_gen):
Expand Down Expand Up @@ -920,6 +925,28 @@ def test_running_window_function_exec_for_all_aggs():
('c_fp_nan', RepeatSeqGen(FloatGen().with_special_case(math.nan, 200.0), length=5)),
]

_gen_data_for_collect_set_nested = [
('a', RepeatSeqGen(LongGen(), length=20)),
('b', LongRangeGen()),
('c_int', RepeatSeqGen(IntegerGen(), length=15)),
('c_struct_array_1', RepeatSeqGen(struct_array_gen_no_nans, length=15)),
('c_struct_array_2', RepeatSeqGen(StructGen([
['c0', struct_array_gen_no_nans], ['c1', int_gen]]), length=14)),
('c_array_struct', RepeatSeqGen(ArrayGen(all_basic_struct_gen_no_nan), length=15)),
('c_array_array_bool', RepeatSeqGen(ArrayGen(ArrayGen(BooleanGen())), length=15)),
('c_array_array_int', RepeatSeqGen(ArrayGen(ArrayGen(IntegerGen())), length=15)),
('c_array_array_long', RepeatSeqGen(ArrayGen(ArrayGen(LongGen())), length=15)),
('c_array_array_short', RepeatSeqGen(ArrayGen(ArrayGen(ShortGen())), length=15)),
('c_array_array_date', RepeatSeqGen(ArrayGen(ArrayGen(DateGen())), length=15)),
('c_array_array_timestamp', RepeatSeqGen(ArrayGen(ArrayGen(TimestampGen())), length=15)),
('c_array_array_byte', RepeatSeqGen(ArrayGen(ArrayGen(ByteGen())), length=15)),
('c_array_array_string', RepeatSeqGen(ArrayGen(ArrayGen(StringGen())), length=15)),
('c_array_array_float', RepeatSeqGen(ArrayGen(ArrayGen(FloatGen(no_nans=True))), length=15)),
('c_array_array_double', RepeatSeqGen(ArrayGen(ArrayGen(DoubleGen(no_nans=True))), length=15)),
('c_array_array_decimal_32', RepeatSeqGen(ArrayGen(ArrayGen(DecimalGen(precision=8, scale=3))), length=15)),
('c_array_array_decimal_64', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_64bit)), length=15)),
('c_array_array_decimal_128', RepeatSeqGen(ArrayGen(ArrayGen(decimal_gen_128bit)), length=15)),
]

# SortExec does not support array type, so sort the result locally.
@ignore_order(local=True)
Expand Down Expand Up @@ -977,6 +1004,80 @@ def test_window_aggs_for_rows_collect_set():
) t
''')


# Note, using sort_array() on the CPU, because sort_array() does not yet
# support sorting certain nested/arbitrary types on the GPU
# See https://github.com/NVIDIA/spark-rapids/issues/3715
# and https://github.com/rapidsai/cudf/issues/11222
@ignore_order(local=True)
@allow_non_gpu("ProjectExec", "SortArray")
def test_window_aggs_for_rows_collect_set_nested_array():
conf = copy_and_update(_no_nans_float_conf, {
"spark.rapids.sql.castFloatToString.enabled": "true",
"spark.rapids.sql.castDecimalToString.enabled": "true",
"spark.rapids.sql.expression.SortArray": "false"
})

assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, _gen_data_for_collect_set_nested, length=1024),
"window_collect_table",
'''
select a, b,
sort_array(cc_struct_array_1),
sort_array(cc_struct_array_2),
sort_array(cc_array_struct),
sort_array(cc_array_array_bool),
sort_array(cc_array_array_int),
sort_array(cc_array_array_long),
sort_array(cc_array_array_short),
sort_array(cc_array_array_date),
sort_array(cc_array_array_ts),
sort_array(cc_array_array_byte),
sort_array(cc_array_array_str),
sort_array(cc_array_array_float),
sort_array(cc_array_array_double),
sort_array(cc_array_array_decimal_32),
sort_array(cc_array_array_decimal_64),
sort_array(cc_array_array_decimal_128)
from (
select a, b,
collect_set(c_struct_array_1) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_struct_array_1,
collect_set(c_struct_array_2) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_struct_array_2,
collect_set(c_array_struct) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_struct,
collect_set(c_array_array_bool) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_bool,
collect_set(c_array_array_int) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_int,
collect_set(c_array_array_long) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_long,
collect_set(c_array_array_short) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_short,
collect_set(c_array_array_date) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_date,
collect_set(c_array_array_timestamp) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_ts,
collect_set(c_array_array_byte) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_byte,
collect_set(c_array_array_string) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_str,
collect_set(c_array_array_float) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_float,
collect_set(c_array_array_double) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_double,
collect_set(c_array_array_decimal_32) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_32,
collect_set(c_array_array_decimal_64) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_64,
collect_set(c_array_array_decimal_128) over
(partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as cc_array_array_decimal_128
from window_collect_table
) t
''', conf=conf)


# In a distributed setup the order of the partitions returned might be different, so we must ignore the order
# but small batch sizes can make sort very slow, so do the final order by locally
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3332,13 +3332,31 @@ object GpuOverrides extends Logging {
"Collect a set of unique elements, not supported in reduction",
ExprChecks.fullAgg(
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
TypeSig.NULL + TypeSig.STRUCT),
TypeSig.NULL + TypeSig.STRUCT + TypeSig.ARRAY),
TypeSig.ARRAY.nested(TypeSig.all),
Seq(ParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.DECIMAL_128 +
TypeSig.NULL + TypeSig.STRUCT).nested(),
TypeSig.NULL +
TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Support for structs containing " +
s"float/double array columns requires ${RapidsConf.HAS_NANS} to be set to false") +
TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Support for arrays of arrays of " +
s"floats/doubles requires ${RapidsConf.HAS_NANS} to be set to false")).nested(),
TypeSig.all))),
(c, conf, p, r) => new TypedImperativeAggExprMeta[CollectSet](c, conf, p, r) {

private def isNestedArrayType(dt: DataType): Boolean = {
dt match {
case StructType(fields) => fields.exists(_.dataType.isInstanceOf[ArrayType])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I have a struct of a struct with an array in it? I think this needs to be a recursive call.

case ArrayType(et, _) => et.isInstanceOf[ArrayType] || et.isInstanceOf[StructType]
case _ => false
}
}

override def tagAggForGpu(): Unit = {
if (isNestedArrayType(c.child.dataType)) {
checkAndTagFloatNanAgg("CollectSet", c.child.dataType, conf, this)
}
}
override def convertToGpu(childExprs: Seq[Expression]): GpuExpression =
GpuCollectSet(childExprs.head, c.mutableAggBufferOffset, c.inputAggBufferOffset)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,8 @@ case class GpuCollectSet(

override def windowAggregation(
inputs: Seq[(ColumnVector, Int)]): RollingAggregationOnColumn =
RollingAggregation.collectSet().onColumn(inputs.head._2)
RollingAggregation.collectSet(NullPolicy.EXCLUDE, NullEquality.EQUAL,
NaNEquality.UNEQUAL).onColumn(inputs.head._2)
}

trait CpuToGpuAggregateBufferConverter {
Expand Down
Loading