Skip to content

Commit

Permalink
Add Decimal support for In, InSet, AtLeastNNonNulls, GetArrayItem, Ge…
Browse files Browse the repository at this point in the history
…tStructField, and GenerateExec (#1410)

Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Dec 16, 2020
1 parent d703f80 commit ce36d4c
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 56 deletions.
40 changes: 20 additions & 20 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ Accelerator supports are described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (Only literal arrays and the output of the array function are supported; missing nested DECIMAL, NULL, BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><em>PS* (Only literal arrays and the output of the array function are supported; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -1601,13 +1601,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -4925,7 +4925,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down Expand Up @@ -4963,13 +4963,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, BINARY, CALENDAR, MAP, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -5191,7 +5191,7 @@ Accelerator support is described below.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td> </td>
</tr>
<tr>
Expand All @@ -5206,13 +5206,13 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
<td><b>NS</b></td>
</tr>
<tr>
Expand Down Expand Up @@ -5893,7 +5893,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand All @@ -5914,7 +5914,7 @@ Accelerator support is described below.
<td><em>PS (Literal value only)</em></td>
<td><em>PS* (Literal value only)</em></td>
<td><em>PS (Literal value only)</em></td>
<td><b>NS</b></td>
<td><em>PS* (Literal value only)</em></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down Expand Up @@ -6025,7 +6025,7 @@ Accelerator support is described below.
<td>S</td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td>S*</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def test_array_index(data_gen):
'a[null]',
'a[3]',
'a[50]',
'a[-1]'))
'a[-1]'),
conf=allow_negative_scale_of_decimal_conf)

# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
Expand Down
43 changes: 27 additions & 16 deletions integration_tests/src/main/python/cmp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,39 +105,44 @@ def test_gte(data_gen):
f.col('b') >= f.lit(None).cast(data_type),
f.col('a') >= f.col('b')), conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_isnull(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.isnull(f.col('a'))))
f.isnull(f.col('a'))),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', [FloatGen(), DoubleGen()], ids=idfn)
def test_isnan(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(
f.isnan(f.col('a'))))

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_dropna_any(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna())
lambda spark : binary_op_df(spark, data_gen).dropna(),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_dropna_all(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).dropna(how='all'))
lambda spark : binary_op_df(spark, data_gen).dropna(how='all'),
conf=allow_negative_scale_of_decimal_conf)

#dropna is really a filter along with a test for null, but lets do an explicit filter test too
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_filter(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')))
lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')),
conf=allow_negative_scale_of_decimal_conf)

# coalesce batch happens after a filter, but only if something else happens on the GPU after that
@pytest.mark.parametrize('data_gen', eq_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn)
def test_filter_with_project(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'))
lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'),
conf=allow_negative_scale_of_decimal_conf)

@pytest.mark.parametrize('expr', [f.lit(True), f.lit(False), f.lit(None).cast('boolean')], ids=idfn)
def test_filter_with_lit(expr):
Expand All @@ -146,21 +151,27 @@ def test_filter_with_lit(expr):

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries under that value.
@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
# we have to make the scalars in a session so negative scales in decimals are supported
scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))),
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)),
conf=allow_negative_scale_of_decimal_conf)

# Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf
# This is to test entries over that value.
@pytest.mark.parametrize('data_gen', eq_gens, ids=idfn)
@pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen, ids=idfn)
def test_in_set(data_gen):
# nulls are not supported for in on the GPU yet
num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1
scalars = list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))
# we have to make the scalars in a session so negative scales in decimals are supported
scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))),
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)))
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)),
conf=allow_negative_scale_of_decimal_conf)

13 changes: 9 additions & 4 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,15 @@ def gen_df(spark, data_gen, length=2048, seed=0):
data = [src.gen() for index in range(0, length)]
return spark.createDataFrame(data, src.data_type)

def _mark_as_lit(data):
def _mark_as_lit(data, data_type = None):
# Sadly you cannot create a literal from just an array in pyspark
if isinstance(data, list):
return f.array([_mark_as_lit(x) for x in data])
return f.lit(data)
if data_type is None:
return f.lit(data)
else:
# lit does not take a data type so we might have to cast it
return f.lit(data).cast(data_type)

def _gen_scalars_common(data_gen, count, seed=0):
if isinstance(data_gen, list):
Expand All @@ -614,7 +618,8 @@ def gen_scalars(data_gen, count, seed=0, force_no_nulls=False):
if force_no_nulls:
assert(not isinstance(data_gen, NullGen))
src = _gen_scalars_common(data_gen, count, seed=seed)
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls)) for i in range(0, count))
data_type = src.data_type
return (_mark_as_lit(src.gen(force_no_nulls=force_no_nulls), data_type) for i in range(0, count))

def gen_scalar(data_gen, seed=0, force_no_nulls=False):
"""Generate a single scalar value."""
Expand Down Expand Up @@ -761,7 +766,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):

boolean_gens = [boolean_gen]

single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens]
single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]]

# Be careful to not make these too large of data generation takes for ever
# This is only a few nested array gens, because nesting can be very deep
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def four_op_df(spark, gen, length=2048, seed=0):
('d', gen)], nullable=False), length=length, seed=seed)

all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen()]
FloatGen(), DoubleGen(), BooleanGen(), DateGen(), TimestampGen(),
decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision,
decimal_gen_64bit]

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
@ignore_order(local=True)
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/struct_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
@pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", float_gen]]),
StructGen([["first", short_gen], ["second", int_gen], ["third", long_gen]]),
StructGen([["first", double_gen], ["second", date_gen], ["third", timestamp_gen]]),
StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]])], ids=idfn)
StructGen([["first", string_gen], ["second", ArrayGen(byte_gen)], ["third", simple_string_to_string_map_gen]]),
StructGen([["first", decimal_gen_default], ["second", decimal_gen_scale_precision], ["third", decimal_gen_same_scale_precision]])], ids=idfn)
def test_struct_get_item(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr(
Expand Down
15 changes: 12 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuInSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,18 @@ case class GpuInSet(
ColumnVector.timestampMicroSecondsFromBoxedLongs(timestamps:_*)
case StringType =>
val strings = values.asInstanceOf[Seq[UTF8String]]
val builder = HostColumnVector.builder(DType.STRING, strings.size)
strings.foreach(s => builder.appendUTF8String(s.getBytes))
builder.buildAndPutOnDevice()
withResource(HostColumnVector.builder(DType.STRING, strings.size)) { builder =>
strings.foreach(s => builder.appendUTF8String(s.getBytes))
builder.buildAndPutOnDevice()
}
case t: DecimalType =>
val decs = values.asInstanceOf[Seq[Decimal]]
// When we support DECIMAL32 this will need to change to support that
withResource(HostColumnVector.builder(DType.create(DType.DTypeEnum.DECIMAL64, - t.scale),
decs.size)) { builder =>
decs.foreach(d => builder.appendUnscaledDecimal(d.toUnscaledLong))
builder.buildAndPutOnDevice()
}
case _ =>
throw new UnsupportedOperationException(s"Unsupported list type: ${child.dataType}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ object GpuOverrides {
"Checks if number of non null/Nan values is greater than a given value",
ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.MAP + TypeSig.ARRAY +
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP + TypeSig.ARRAY +
TypeSig.STRUCT).nested(),
TypeSig.all))),
(a, conf, p, r) => new ExprMeta[AtLeastNNonNulls](a, conf, p, r) {
Expand Down Expand Up @@ -1466,8 +1466,10 @@ object GpuOverrides {
expr[In](
"IN operator",
ExprChecks.projectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all)),
Some(RepeatingParamCheck("list", TypeSig.commonCudfTypes.withAllLit(), TypeSig.all))),
Seq(ParamCheck("value", TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL,
TypeSig.all)),
Some(RepeatingParamCheck("list", (TypeSig.commonCudfTypes + TypeSig.DECIMAL).withAllLit(),
TypeSig.all))),
(in, conf, p, r) => new ExprMeta[In](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
val unaliased = in.list.map(extractLit)
Expand All @@ -1485,7 +1487,7 @@ object GpuOverrides {
expr[InSet](
"INSET operator",
ExprChecks.unaryProjectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(in, conf, p, r) => new ExprMeta[InSet](in, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (in.hset.contains(null)) {
Expand Down Expand Up @@ -1864,10 +1866,11 @@ object GpuOverrides {
expr[GetStructField](
"Gets the named field of the struct",
ExprChecks.unaryProjectNotLambda(
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP).nested(),
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL +
TypeSig.DECIMAL).nested(),
TypeSig.all,
TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY +
TypeSig.STRUCT + TypeSig.MAP),
TypeSig.STRUCT + TypeSig.MAP + TypeSig.NULL + TypeSig.DECIMAL),
TypeSig.STRUCT.nested(TypeSig.all)),
(expr, conf, p, r) => new UnaryExprMeta[GetStructField](expr, conf, p, r) {
override def convertToGpu(arr: Expression): GpuExpression =
Expand All @@ -1876,10 +1879,11 @@ object GpuOverrides {
expr[GetArrayItem](
"Gets the field at `ordinal` in the Array",
ExprChecks.binaryProjectNotLambda(
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL).nested(),
(TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.NULL +
TypeSig.DECIMAL + TypeSig.MAP).nested(),
TypeSig.all,
("array", TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.ARRAY +
TypeSig.STRUCT + TypeSig.NULL),
TypeSig.STRUCT + TypeSig.NULL + TypeSig.DECIMAL + TypeSig.MAP),
TypeSig.ARRAY.nested(TypeSig.all)),
("ordinal", TypeSig.lit(TypeEnum.INT), TypeSig.INT)),
(in, conf, p, r) => new GpuGetArrayItemMeta(in, conf, p, r)),
Expand Down Expand Up @@ -2165,10 +2169,10 @@ object GpuOverrides {
exec[GenerateExec] (
"The backend for operations that generate more output rows than input rows like explode",
ExecChecks(
TypeSig.commonCudfTypes
TypeSig.commonCudfTypes + TypeSig.DECIMAL
.withPsNote(TypeEnum.ARRAY,
"Only literal arrays and the output of the array function are supported")
.nested(TypeSig.commonCudfTypes),
.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
TypeSig.all),
(gen, conf, p, r) => new GpuGenerateExecSparkPlanMeta(gen, conf, p, r)),
exec[ProjectExec](
Expand Down

0 comments on commit ce36d4c

Please sign in to comment.