From ee7b99f5b5f11b570d727c59156b289ab15ca712 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 20 Oct 2023 00:31:59 -0700 Subject: [PATCH] Support Decimal256 in Min/Max aggregate expressions --- .../physical-expr/src/aggregate/min_max.rs | 34 +++++++++++++++++++ .../sqllogictest/test_files/decimal.slt | 10 ++++++ 2 files changed, 44 insertions(+) diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 5c4c48b15803..f5b708e8894e 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -53,6 +53,9 @@ use crate::aggregate::utils::down_cast_any_ref; use crate::expressions::format_state_name; use arrow::array::Array; use arrow::array::Decimal128Array; +use arrow::array::Decimal256Array; +use arrow::datatypes::i256; +use arrow::datatypes::Decimal256Type; use super::moving_min_max; @@ -183,6 +186,7 @@ impl AggregateExpr for Max { | Float32 | Float64 | Decimal128(_, _) + | Decimal256(_, _) | Date32 | Date64 | Time32(_) @@ -239,6 +243,9 @@ impl AggregateExpr for Max { Decimal128(_, _) => { instantiate_max_accumulator!(self, i128, Decimal128Type) } + Decimal256(_, _) => { + instantiate_max_accumulator!(self, i256, Decimal256Type) + } // It would be nice to have a fast implementation for Strings as well // https://github.com/apache/arrow-datafusion/issues/6906 @@ -318,6 +325,16 @@ macro_rules! min_max_batch { scale ) } + DataType::Decimal256(precision, scale) => { + typed_min_max_batch!( + $VALUES, + Decimal256Array, + Decimal256, + $OP, + precision, + scale + ) + } // all types that have a natural order DataType::Float64 => { typed_min_max_batch!($VALUES, Float64Array, Float64, $OP) @@ -522,6 +539,19 @@ macro_rules! min_max { ); } } + ( + lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss), + rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss) + ) => { + if lhsp.eq(rhsp) && lhss.eq(rhss) { + typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss) + } else { + return internal_err!( + "MIN/MAX is not expected to receive scalars of incompatible types {:?}", + (lhs, rhs) + ); + } + } (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => { typed_min_max!(lhs, rhs, Boolean, $OP) } @@ -880,6 +910,7 @@ impl AggregateExpr for Min { | Float32 | Float64 | Decimal128(_, _) + | Decimal256(_, _) | Date32 | Date64 | Time32(_) @@ -935,6 +966,9 @@ impl AggregateExpr for Min { Decimal128(_, _) => { instantiate_min_accumulator!(self, i128, Decimal128Type) } + Decimal256(_, _) => { + instantiate_min_accumulator!(self, i256, Decimal256Type) + } // This is only reached if groups_accumulator_supported is out of sync _ => internal_err!( "GroupsAccumulator not supported for min({})", diff --git a/datafusion/sqllogictest/test_files/decimal.slt b/datafusion/sqllogictest/test_files/decimal.slt index f968ffb90a1c..87a846c07727 100644 --- a/datafusion/sqllogictest/test_files/decimal.slt +++ b/datafusion/sqllogictest/test_files/decimal.slt @@ -691,5 +691,15 @@ select arrow_typeof(avg(c1)), avg(c1) from decimal256_simple; ---- Decimal256(54, 10) 0.0000366666 +query TR +select arrow_typeof(min(c1)), min(c1) from decimal256_simple where c4=false; +---- +Decimal256(50, 6) 0.00002 + +query TR +select arrow_typeof(max(c1)), max(c1) from decimal256_simple where c4=false; +---- +Decimal256(50, 6) 0.00005 + statement ok drop table decimal256_simple;