Skip to content

Commit

Permalink
Move Count to functions-aggregate, update MSRV to rust 1.75 (apac…
Browse files Browse the repository at this point in the history
…he#10484)

* mv accumulate indices

Signed-off-by: jayzhan211 <[email protected]>

* complete udaf

Signed-off-by: jayzhan211 <[email protected]>

* register

Signed-off-by: jayzhan211 <[email protected]>

* fix expr

Signed-off-by: jayzhan211 <[email protected]>

* filter distinct count

Signed-off-by: jayzhan211 <[email protected]>

* todo: need to move count distinct too

Signed-off-by: jayzhan211 <[email protected]>

* move code around

Signed-off-by: jayzhan211 <[email protected]>

* move distinct to aggr-crate

Signed-off-by: jayzhan211 <[email protected]>

* replace

Signed-off-by: jayzhan211 <[email protected]>

* backup

Signed-off-by: jayzhan211 <[email protected]>

* fix function name and physical expr

Signed-off-by: jayzhan211 <[email protected]>

* fix physical optimizer

Signed-off-by: jayzhan211 <[email protected]>

* fix all slt

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* fix with args

Signed-off-by: jayzhan211 <[email protected]>

* add label

Signed-off-by: jayzhan211 <[email protected]>

* revert builtin related code back

Signed-off-by: jayzhan211 <[email protected]>

* fix test

Signed-off-by: jayzhan211 <[email protected]>

* fix substrait

Signed-off-by: jayzhan211 <[email protected]>

* fix doc

Signed-off-by: jayzhan211 <[email protected]>

* fmy

Signed-off-by: jayzhan211 <[email protected]>

* fix

Signed-off-by: jayzhan211 <[email protected]>

* fix udaf macro for distinct but not apply

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* fix count distinct and use workspace

Signed-off-by: jayzhan211 <[email protected]>

* add reverse

Signed-off-by: jayzhan211 <[email protected]>

* remove old code

Signed-off-by: jayzhan211 <[email protected]>

* backup

Signed-off-by: jayzhan211 <[email protected]>

* use macro

Signed-off-by: jayzhan211 <[email protected]>

* expr builder

Signed-off-by: jayzhan211 <[email protected]>

* introduce expr builder

Signed-off-by: jayzhan211 <[email protected]>

* add example

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* clean agg sta

Signed-off-by: jayzhan211 <[email protected]>

* combine agg

Signed-off-by: jayzhan211 <[email protected]>

* limit distinct and fmt

Signed-off-by: jayzhan211 <[email protected]>

* cleanup name

Signed-off-by: jayzhan211 <[email protected]>

* fix ci

Signed-off-by: jayzhan211 <[email protected]>

* fix window

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* fix ci

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* fix merged

Signed-off-by: jayzhan211 <[email protected]>

* fix

Signed-off-by: jayzhan211 <[email protected]>

* fix rebase

Signed-off-by: jayzhan211 <[email protected]>

* cleanup

Signed-off-by: jayzhan211 <[email protected]>

* use std

Signed-off-by: jayzhan211 <[email protected]>

* update mrsv

Signed-off-by: jayzhan211 <[email protected]>

* upd msrv

Signed-off-by: jayzhan211 <[email protected]>

* revert test

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* downgrade to 1.75

Signed-off-by: jayzhan211 <[email protected]>

* 1.76

Signed-off-by: jayzhan211 <[email protected]>

* ahas

Signed-off-by: jayzhan211 <[email protected]>

* revert to 1.75

Signed-off-by: jayzhan211 <[email protected]>

* rm count

Signed-off-by: jayzhan211 <[email protected]>

* fix merge

Signed-off-by: jayzhan211 <[email protected]>

* fmt

Signed-off-by: jayzhan211 <[email protected]>

* clippy

Signed-off-by: jayzhan211 <[email protected]>

* rm sum in test_no_duplicate_name

Signed-off-by: jayzhan211 <[email protected]>

* fix

Signed-off-by: jayzhan211 <[email protected]>

---------

Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 authored Jun 12, 2024
1 parent 908a3a1 commit 8f718dd
Show file tree
Hide file tree
Showing 52 changed files with 822 additions and 1,329 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
rust-version = "1.73"
rust-version = "1.75"
version = "39.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -107,7 +107,7 @@ doc-comment = "0.3"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
hashbrown = { version = "0.14", features = ["raw"] }
hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.0.0"
itertools = "0.12"
log = "^0.4"
Expand Down
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ license = "Apache-2.0"
homepage = "https://datafusion.apache.org"
repository = "https://github.com/apache/datafusion"
# Specify MSRV here as `cargo msrv` doesn't support workspace version
rust-version = "1.73"
rust-version = "1.75"
readme = "README.md"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ authors = { workspace = true }
# Specify MSRV here as `cargo msrv` doesn't support workspace version and fails with
# "Unable to find key 'package.rust-version' (or 'package.metadata.msrv') in 'arrow-datafusion/Cargo.toml'"
# https://github.com/foresterre/cargo-msrv/issues/590
rust-version = "1.73"
rust-version = "1.75"

[lints]
workspace = true
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ use datafusion_common::{
};
use datafusion_expr::lit;
use datafusion_expr::{
avg, count, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
avg, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown,
UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_functions_aggregate::expr_fn::{median, stddev};
use datafusion_functions_aggregate::expr_fn::{count, median, stddev, sum};

use async_trait::async_trait;

Expand Down Expand Up @@ -854,10 +853,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(
vec![],
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1594,9 +1590,10 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, count_distinct, create_udf, expr, lit, BuiltInWindowFunction,
array_agg, cast, create_udf, expr, lit, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::count_distinct;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

Expand Down
79 changes: 27 additions & 52 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,38 +170,6 @@ fn take_optimizable_column_and_table_count(
}
}
}
// TODO: Remove this after revmoing Builtin Count
else if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
) {
// TODO implementing Eq on PhysicalExpr would help a lot here
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Column>()
{
let current_val = &col_stats[col_expr.index()].null_count;
if let &Precision::Exact(val) = current_val {
return Some((
ScalarValue::Int64(Some((num_rows - val) as i64)),
casted_expr.name().to_string(),
));
}
} else if let Some(lit_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
casted_expr.name().to_owned(),
));
}
}
}
}
None
}

Expand Down Expand Up @@ -307,13 +275,12 @@ fn take_optimizable_max(

#[cfg(test)]
pub(crate) mod tests {

use super::*;

use crate::logical_expr::Operator;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::expressions::Count;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::memory::MemoryExec;
use crate::prelude::SessionContext;
Expand All @@ -322,8 +289,10 @@ pub(crate) mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_int64_array;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::cast;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
Expand Down Expand Up @@ -414,13 +383,19 @@ pub(crate) mod tests {
Self::ColumnA(schema.clone())
}

/// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self) -> Arc<dyn AggregateExpr> {
Arc::new(Count::new(
self.column(),
// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[self.column()],
&[],
&[],
schema,
self.column_name(),
DataType::Int64,
))
false,
false,
)
.unwrap()
}

/// what argument would this aggregate need in the plan?
Expand Down Expand Up @@ -458,7 +433,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -467,7 +442,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -488,7 +463,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -497,7 +472,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand All @@ -517,7 +492,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -529,7 +504,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand All @@ -549,7 +524,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
source,
Arc::clone(&schema),
Expand All @@ -561,7 +536,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
Expand Down Expand Up @@ -592,7 +567,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -601,7 +576,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down Expand Up @@ -637,7 +612,7 @@ pub(crate) mod tests {
let partial_agg = AggregateExec::try_new(
AggregateMode::Partial,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
filter,
Arc::clone(&schema),
Expand All @@ -646,7 +621,7 @@ pub(crate) mod tests {
let final_agg = AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![agg.count_expr(&schema)],
vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
Expand Down
47 changes: 26 additions & 21 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ mod tests {
use crate::physical_plan::{displayable, Partitioning};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::{col, Count};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::udaf::create_aggregate_expr;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
Expand Down Expand Up @@ -303,15 +304,31 @@ mod tests {
)
}

// Return appropriate expr depending if COUNT is for col or table (*)
fn count_expr(
expr: Arc<dyn PhysicalExpr>,
name: &str,
schema: &Schema,
) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[expr],
&[],
&[],
schema,
name,
false,
false,
)
.unwrap()
}

#[test]
fn aggregations_not_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
repartition_exec(partial_aggregate_exec(
parquet_exec(&schema),
Expand All @@ -330,16 +347,8 @@ mod tests {
];
assert_optimized!(expected, plan);

let aggr_expr1 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr2 = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(2)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr1 = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];
let aggr_expr2 = vec![count_expr(lit(1i8), "COUNT(2)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand All @@ -365,11 +374,7 @@ mod tests {
#[test]
fn aggregations_combined() -> Result<()> {
let schema = schema();
let aggr_expr = vec![Arc::new(Count::new(
lit(1i8),
"COUNT(1)".to_string(),
DataType::Int64,
)) as _];
let aggr_expr = vec![count_expr(lit(1i8), "COUNT(1)", &schema)];

let plan = final_aggregate_exec(
partial_aggregate_exec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,10 @@ mod tests {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![None], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down Expand Up @@ -554,10 +554,10 @@ mod tests {
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
vec![agg.count_expr(&schema)], /* aggr_expr */
vec![filter_expr], /* filter_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
Expand Down
Loading

0 comments on commit 8f718dd

Please sign in to comment.