From ba4a12584461376a8e9398a556c0f4ab956f6e44 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Mon, 12 Feb 2024 17:36:56 -0600 Subject: [PATCH 1/7] chore: add test cases for predicate is_null and is_not_null --- .../core/src/physical_optimizer/pruning.rs | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 648b1f70c58b..f5f8622631cd 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -2052,6 +2052,32 @@ mod tests { Ok(()) } + #[test] + fn row_group_predicate_is_null() -> Result<()> { + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "c1_null_count@0 > 0"; + + let expr = col("c1").is_null(); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); + assert_eq!(predicate_expr.to_string(), expected_expr); + + Ok(()) + } + + #[test] + fn row_group_predicate_is_not_null() -> Result<()> { + let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); + let expected_expr = "true"; + + let expr = col("c1").is_not_null(); + let predicate_expr = + test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new()); + assert_eq!(predicate_expr.to_string(), expected_expr); + + Ok(()) + } + #[test] fn row_group_predicate_required_columns() -> Result<()> { let schema = Schema::new(vec![ From df7284c37f6f0e744d8f8f800b0547bf8b23b2b9 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Mon, 12 Feb 2024 17:44:24 -0600 Subject: [PATCH 2/7] feat(pruning): support predicate build for is_not_null expression --- datafusion/core/Cargo.toml | 2 +- .../core/src/physical_optimizer/pruning.rs | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 429cf35fcd3b..7f99c6af82e4 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -90,7 +90,7 @@ pin-project-lite = "^0.2.7" rand = { workspace = true } sqlparser = { workspace = true } tempfile = { workspace = true } -tokio = { workspace = true, feature = ["rt-multi-thread", "parking_lot"] } +tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] } tokio-util = { version = "0.7.4", features = ["io"], optional = true } url = { workspace = true } uuid = { version = "1.0", features = ["v4"] } diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index f5f8622631cd..5f9d0ca8434d 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -1120,6 +1120,34 @@ fn build_is_null_column_expr( } } +/// Given an expression reference to `expr`, if `expr` is a column expression, +/// returns a pruning expression in terms of IsNotNull that will evaluate to true +/// if the column does NOT contain null, and false if it may contain null +fn build_is_not_null_column_expr( + expr: &Arc, + schema: &Schema, + required_columns: &mut RequiredColumns, +) -> Option> { + if let Some(col) = expr.as_any().downcast_ref::() { + let field = schema.field_with_name(col.name()).ok()?; + + let null_count_field = &Field::new(field.name(), DataType::UInt64, true); + required_columns + .null_count_column_expr(col, expr, null_count_field) + .map(|null_count_column_expr| { + // IsNotNull(column) => null_count = 0 + Arc::new(phys_expr::BinaryExpr::new( + null_count_column_expr, + Operator::Eq, + Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))), + )) as _ + }) + .ok() + } else { + None + } +} + /// The maximum number of entries in an `InList` that might be rewritten into /// an OR chain const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20; @@ -1146,6 +1174,14 @@ fn build_predicate_expression( return build_is_null_column_expr(is_null.arg(), schema, required_columns) .unwrap_or(unhandled); } + if let Some(is_not_null) = expr_any.downcast_ref::() { + return build_is_not_null_column_expr( + is_not_null.arg(), + schema, + required_columns, + ) + .unwrap_or(unhandled); + } if let Some(col) = expr_any.downcast_ref::() { return build_single_column_expr(col, schema, required_columns, false) .unwrap_or(unhandled); @@ -2068,7 +2104,7 @@ mod tests { #[test] fn row_group_predicate_is_not_null() -> Result<()> { let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); - let expected_expr = "true"; + let expected_expr = "c1_null_count@0 = 0"; let expr = col("c1").is_not_null(); let predicate_expr = From 852f30befbef4cee11315bc767d6bdf478467b14 Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Mon, 12 Feb 2024 18:05:47 -0600 Subject: [PATCH 3/7] doc: add example in doc for `IS NOT NULL` --- datafusion/core/src/physical_optimizer/pruning.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 5f9d0ca8434d..4303c4038927 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -315,6 +315,7 @@ pub trait PruningStatistics { /// `x < 5` | `x_max < 5` /// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max` /// `x IS NULL` | `x_null_count > 0` +/// `x IS NOT NULL` | `x_null_count = 0 /// /// ## Predicate Evaluation /// The PruningPredicate works in two passes From a6dfec9074c4eb59d2204e7316785ff3e4a3967a Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Mon, 12 Feb 2024 18:09:36 -0600 Subject: [PATCH 4/7] chore: remove edit on cargo file --- datafusion/core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 7f99c6af82e4..429cf35fcd3b 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -90,7 +90,7 @@ pin-project-lite = "^0.2.7" rand = { workspace = true } sqlparser = { workspace = true } tempfile = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] } +tokio = { workspace = true, feature = ["rt-multi-thread", "parking_lot"] } tokio-util = { version = "0.7.4", features = ["io"], optional = true } url = { workspace = true } uuid = { version = "1.0", features = ["v4"] } From 5040cf836554fb6eae1920260302b6263be7829e Mon Sep 17 00:00:00 2001 From: Chunchun <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:34:29 -0600 Subject: [PATCH 5/7] chore: add `IS NOT NULL` test for row group pruning chore: remove Debug derive --- .../physical_plan/parquet/row_groups.rs | 49 +++++++++++++++++-- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index fa9523a76380..e21547705a70 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -620,13 +620,20 @@ mod tests { ParquetStatistics::boolean(Some(false), Some(true), None, 1, false), ], ); - vec![rgm1, rgm2] + let rgm3 = get_row_group_meta_data( + &schema_descr, + vec![ + ParquetStatistics::int32(Some(17), Some(30), None, 1, false), + ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), + ], + ); + vec![rgm1, rgm2, rgm3] } #[test] fn row_group_pruning_predicate_null_expr() { use datafusion_expr::{col, lit}; - // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + // c1 > 15 and IsNull(c2) => c1_max > 15 and c2_null_count > 0 let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), @@ -657,7 +664,7 @@ mod tests { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr // - // int > 1 and bool = NULL => c1_max > 1 and null + // c1 > 15 and c2 = NULL => c1_max > 15 and null let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), @@ -672,7 +679,7 @@ mod tests { let metrics = parquet_file_metrics(); // bool = NULL always evaluates to NULL (and thus will not - // pass predicates. Ideally these should both be false + // pass predicates. Ideally these should all be false assert_eq!( prune_row_groups_by_statistics( &schema, @@ -682,7 +689,39 @@ mod tests { Some(&pruning_predicate), &metrics ), - vec![1] + vec![1, 2] + ); + } + + #[test] + fn row_group_pruning_predicate_not_null_expr() { + use datafusion_expr::{col, lit}; + // c1 > 15 and IsNotNull(c2) => c1_max > 15 and c2_null_count = 0 + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Int32, false), + Field::new("c2", DataType::Boolean, false), + ])); + let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); + let expr = col("c1").gt(lit(15)).and(col("c2").is_not_null()); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &groups, + None, + Some(&pruning_predicate), + &metrics + ), + // The first row group was filtered out because c1_max is 10, which is smaller than 15. + // The second row group was filtered out because it contains null value on "c2". + // The third row group is kept because c1_max is 30, which is greater than 15 AND + // it does NOT contain any null value on "c2". + vec![2] ); } From d7a4f0cfde383c20a160e67ae12baf278e2e16d2 Mon Sep 17 00:00:00 2001 From: Chunchun Ye <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 13 Feb 2024 11:08:03 -0500 Subject: [PATCH 6/7] chore: update comment null --> NULL Co-authored-by: Liang-Chi Hsieh --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index e21547705a70..c876694db1ff 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -664,7 +664,7 @@ mod tests { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr // - // c1 > 15 and c2 = NULL => c1_max > 15 and null + // c1 > 15 and c2 = NULL => c1_max > 15 and NULL let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), From 4858717d363f8995e07b92f0148ac6c3cda8fdd5 Mon Sep 17 00:00:00 2001 From: Chunchun Ye <14298407+appletreeisyellow@users.noreply.github.com> Date: Tue, 13 Feb 2024 11:08:21 -0500 Subject: [PATCH 7/7] chore: update comment Co-authored-by: Liang-Chi Hsieh --- datafusion/core/src/physical_optimizer/pruning.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 4303c4038927..e1b52c3837cc 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -315,7 +315,7 @@ pub trait PruningStatistics { /// `x < 5` | `x_max < 5` /// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max` /// `x IS NULL` | `x_null_count > 0` -/// `x IS NOT NULL` | `x_null_count = 0 +/// `x IS NOT NULL` | `x_null_count = 0` /// /// ## Predicate Evaluation /// The PruningPredicate works in two passes