Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sum(distinct) support #2405

Merged
merged 5 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,63 @@ async fn simple_avg() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn query_sum_distinct() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int64, true),
Field::new("c2", DataType::Int64, true),
]));

let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![
Some(0),
Some(1),
None,
Some(3),
Some(3),
])),
Arc::new(Int64Array::from(vec![
None,
Some(1),
Some(1),
Some(2),
Some(2),
])),
],
)?;

let table = MemTable::try_new(schema, vec![vec![data]])?;
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table))?;

// 2 different aggregate functions: avg and sum(distinct)
let sql = "SELECT AVG(c1), SUM(DISTINCT c2) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+--------------+-----------------------+",
"| AVG(test.c1) | SUM(DISTINCT test.c2) |",
"+--------------+-----------------------+",
"| 1.75 | 3 |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"+--------------+-----------------------+",
];
assert_batches_eq!(expected, &actual);

// 2 sum(distinct) functions
let sql = "SELECT SUM(DISTINCT c1), SUM(DISTINCT c2) FROM test";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-----------------------+-----------------------+",
"| SUM(DISTINCT test.c1) | SUM(DISTINCT test.c2) |",
"+-----------------------+-----------------------+",
"| 4 | 3 |",
"+-----------------------+-----------------------+",
];
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn query_count_distinct() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
Expand Down
10 changes: 5 additions & 5 deletions datafusion/physical-expr/src/aggregate/build_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ pub fn create_aggregate_expr(
name,
return_type,
)),
(AggregateFunction::Sum, true) => {
return Err(DataFusionError::NotImplemented(
"SUM(DISTINCT) aggregations are not available".to_string(),
));
}
(AggregateFunction::Sum, true) => Arc::new(expressions::DistinctSum::new(
vec![coerced_phy_exprs[0].clone()],
name,
return_type,
)),
(AggregateFunction::ApproxDistinct, _) => {
Arc::new(expressions::ApproxDistinct::new(
coerced_phy_exprs[0].clone(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod hyperloglog;
pub(crate) mod stats;
pub(crate) mod stddev;
pub(crate) mod sum;
pub(crate) mod sum_distinct;
mod tdigest;
pub(crate) mod variance;

Expand Down
12 changes: 12 additions & 0 deletions datafusion/physical-expr/src/aggregate/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
(ScalarValue::Int64(lhs), ScalarValue::Int8(rhs)) => {
typed_sum!(lhs, rhs, Int64, i64)
}
(ScalarValue::Int64(lhs), ScalarValue::UInt64(rhs)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a fine change in this PR -- though it is strange to me that we have to be doing these casts in sum.rs as it duplicates some non trivial amount of the logic in coercion -- maybe it would be possible to make this code cleaner / consolidate more of the coercion logic.

Again, no changes needed for this PR but I figured I would point it out while reading this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb
I'd look into this coercion logic in some follow up pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coercion u64 to i64 seems irrational to me. Why do we need this kind of coercion in sum distinct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @WinkerDu is just following the same pre-existing pattern with this PR

I agree that the pre-existing pattern doesn't make sense to me (it should have already been done by the time the distinct aggregate code is being executed)

typed_sum!(lhs, rhs, Int64, i64)
}
(ScalarValue::Int64(lhs), ScalarValue::UInt32(rhs)) => {
typed_sum!(lhs, rhs, Int64, i64)
}
(ScalarValue::Int64(lhs), ScalarValue::UInt16(rhs)) => {
typed_sum!(lhs, rhs, Int64, i64)
}
(ScalarValue::Int64(lhs), ScalarValue::UInt8(rhs)) => {
typed_sum!(lhs, rhs, Int64, i64)
}
e => {
return Err(DataFusionError::Internal(format!(
"Sum is not expected to receive a scalar {:?}",
Expand Down
Loading