From cc34b9f14f402792e340b2751a0284faac892197 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Fri, 13 Oct 2023 17:15:06 +0800 Subject: [PATCH 01/11] feat: implement read bloom filter support --- .../src/datasource/physical_plan/parquet.rs | 41 ++- .../physical_plan/parquet/row_groups.rs | 322 ++++++++++++++++-- 2 files changed, 334 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6cab27b0846c..8502d060fed9 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -82,6 +82,9 @@ pub struct ParquetExec { /// Override for `Self::with_enable_page_index`. If None, uses /// values from base_config enable_page_index: Option, + /// Override for `Self::with_enable_bloom_filter`. If None, uses + /// values from base_config + enable_bloom_filter: Option, /// Base configuration for this scan base_config: FileScanConfig, projected_statistics: Statistics, @@ -151,6 +154,7 @@ impl ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, + enable_bloom_filter: None, base_config, projected_schema, projected_statistics, @@ -244,6 +248,18 @@ impl ParquetExec { .unwrap_or(config_options.execution.parquet.enable_page_index) } + /// If enabled, the reader will read the bloom filter + pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self { + self.enable_bloom_filter = Some(enable_bloom_filter); + self + } + + /// Return the value described in [`Self::with_enable_bloom_filter`] + fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool { + self.enable_bloom_filter + .unwrap_or(config_options.execution.parquet.bloom_filter_enabled) + } + /// Redistribute files across partitions according to their size /// See comments on `get_file_groups_repartitioned()` for more detail. pub fn get_repartitioned( @@ -373,6 +389,7 @@ impl ExecutionPlan for ParquetExec { pushdown_filters: self.pushdown_filters(config_options), reorder_filters: self.reorder_filters(config_options), enable_page_index: self.enable_page_index(config_options), + enable_bloom_filter: self.enable_bloom_filter(config_options), }; let stream = @@ -406,6 +423,7 @@ struct ParquetOpener { pushdown_filters: bool, reorder_filters: bool, enable_page_index: bool, + enable_bloom_filter: bool, } impl FileOpener for ParquetOpener { @@ -440,6 +458,7 @@ impl FileOpener for ParquetOpener { self.enable_page_index, &self.page_pruning_predicate, ); + let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; Ok(Box::pin(async move { @@ -482,16 +501,30 @@ impl FileOpener for ParquetOpener { }; }; - // Row group pruning: attempt to skip entire row_groups + // Row group pruning by statistics: attempt to skip entire row_groups // using metadata on the row groups - let file_metadata = builder.metadata(); - let row_groups = row_groups::prune_row_groups( + let file_metadata = builder.metadata().clone(); + let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let mut row_groups = row_groups::prune_row_groups_by_statistics( file_metadata.row_groups(), file_range, - pruning_predicate.as_ref().map(|p| p.as_ref()), + predicate, &file_metrics, ); + // Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups + // using bloom filters on the row groups + if enable_bloom_filter && !row_groups.is_empty() && predicate.is_some() { + row_groups = row_groups::prune_row_groups_by_bloom_filters( + &mut builder, + &row_groups, + file_metadata.row_groups(), + predicate.unwrap(), + &file_metrics, + ) + .await; + } + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index c6e2c68d0211..7b6f11980f0d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -19,24 +19,30 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Schema}, }; -use datafusion_common::Column; -use datafusion_common::ScalarValue; -use log::debug; - -use parquet::file::{ - metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, +use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; +use parquet::{ + arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, + bloom_filter::Sbbf, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, }; - -use crate::datasource::physical_plan::parquet::{ - from_bytes_to_i128, parquet_to_arrow_decimal_type, +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, }; -use crate::{ - datasource::listing::FileRange, - physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + +use crate::datasource::{ + listing::FileRange, + physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type}, }; +use crate::logical_expr::Operator; +use crate::physical_expr::expressions as phys_expr; +use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use crate::physical_plan::PhysicalExpr; use super::ParquetFileMetrics; +/// Prune row groups based on statistics +/// /// Returns a vector of indexes into `groups` which should be scanned. /// /// If an index is NOT present in the returned Vec it means the @@ -44,7 +50,7 @@ use super::ParquetFileMetrics; /// /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. -pub(crate) fn prune_row_groups( +pub(crate) fn prune_row_groups_by_statistics( groups: &[RowGroupMetaData], range: Option, predicate: Option<&PruningPredicate>, @@ -81,7 +87,7 @@ pub(crate) fn prune_row_groups( // stats filter array could not be built // return a closure which will not filter out any row groups Err(e) => { - debug!("Error evaluating row group predicate values {e}"); + log::debug!("Error evaluating row group predicate values {e}"); metrics.predicate_evaluation_errors.add(1); } } @@ -92,6 +98,242 @@ pub(crate) fn prune_row_groups( filtered } +/// Prune row groups by bloom filters +/// +/// Returns a vector of indexes into `groups` which should be scanned. +/// +/// If an index is NOT present in the returned Vec it means the +/// predicate filtered all the row group. +/// +/// If an index IS present in the returned Vec it means the predicate +/// did not filter out that row group. +pub(crate) async fn prune_row_groups_by_bloom_filters< + T: AsyncFileReader + Send + 'static, +>( + builder: &mut ParquetRecordBatchStreamBuilder, + row_groups: &[usize], + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, +) -> Vec { + let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr()) + { + Ok(predicates) => predicates, + Err(_) => { + return row_groups.to_vec(); + } + }; + let mut filtered = Vec::with_capacity(groups.len()); + for idx in row_groups { + let rg_metadata = &groups[*idx]; + // get all columns bloom filter + let mut column_sbbf = + HashMap::with_capacity(bf_predicates.required_columns.len()); + for column_name in bf_predicates.required_columns.iter() { + let column_idx = match rg_metadata + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string().eq(column_name)) + { + Some((column_idx, _)) => column_idx, + None => continue, + }; + let bf = match builder + .get_row_group_column_bloom_filter(*idx, column_idx) + .await + { + Ok(bf) => match bf { + Some(bf) => bf, + None => { + continue; + } + }, + Err(e) => { + log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}"); + metrics.predicate_evaluation_errors.add(1); + continue; + } + }; + column_sbbf.insert(column_name.to_owned(), bf); + } + if bf_predicates.prune(&column_sbbf) { + metrics.row_groups_pruned.add(1); + continue; + } + filtered.push(*idx); + } + filtered +} + +struct BloomFilterPruningPredicate { + /// Actual pruning predicate (rewritten in terms of column min/max statistics) + predicate_expr: Option, + /// The statistics required to evaluate this predicate + required_columns: Vec, +} + +impl BloomFilterPruningPredicate { + fn try_new(expr: &Arc) -> Result { + let expr = expr.as_any().downcast_ref::(); + match Self::get_predicate_columns(expr) { + Some(columns) => Ok(Self { + predicate_expr: expr.cloned(), + required_columns: columns.into_iter().collect(), + }), + None => Err(DataFusionError::Execution( + "BloomFilterPruningPredicate only support binary expr".to_string(), + )), + } + } + + fn prune(&self, column_sbbf: &HashMap) -> bool { + Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) + } + + /// filter the expr with bloom filter return true if the expr can be pruned + fn prune_expr_with_bloom_filter( + expr: Option<&phys_expr::BinaryExpr>, + column_sbbf: &HashMap, + ) -> bool { + if expr.is_none() { + return false; + } + let expr = expr.unwrap(); + match expr.op() { + Operator::And => { + let left = Self::prune_expr_with_bloom_filter( + expr.left().as_any().downcast_ref::(), + column_sbbf, + ); + let right = Self::prune_expr_with_bloom_filter( + expr.right() + .as_any() + .downcast_ref::(), + column_sbbf, + ); + left || right + } + Operator::Or => { + let left = Self::prune_expr_with_bloom_filter( + expr.left().as_any().downcast_ref::(), + column_sbbf, + ); + let right = Self::prune_expr_with_bloom_filter( + expr.right() + .as_any() + .downcast_ref::(), + column_sbbf, + ); + left && right + } + Operator::Eq => { + if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) { + if let Some(sbbf) = column_sbbf.get(col.name()) { + match val { + ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()), + ScalarValue::Boolean(Some(v)) => !sbbf.check(&v), + ScalarValue::Float64(Some(v)) => !sbbf.check(&v), + ScalarValue::Float32(Some(v)) => !sbbf.check(&v), + ScalarValue::Int64(Some(v)) => !sbbf.check(&v), + ScalarValue::Int32(Some(v)) => !sbbf.check(&v), + ScalarValue::Int16(Some(v)) => !sbbf.check(&v), + ScalarValue::Int8(Some(v)) => !sbbf.check(&v), + _ => false, + } + } else { + false + } + } else { + false + } + } + _ => false, + } + } + + fn get_predicate_columns( + expr: Option<&phys_expr::BinaryExpr>, + ) -> Option> { + match expr { + None => None, + Some(expr) => match expr.op() { + Operator::And => { + let left = Self::get_predicate_columns( + expr.left().as_any().downcast_ref::(), + ); + let right = Self::get_predicate_columns( + expr.right() + .as_any() + .downcast_ref::(), + ); + match (left, right) { + (Some(left), Some(right)) => { + let mut columns = left; + columns.extend(right); + Some(columns) + } + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + _ => None, + } + } + Operator::Or => { + let left = Self::get_predicate_columns( + expr.left().as_any().downcast_ref::(), + ); + let right = Self::get_predicate_columns( + expr.right() + .as_any() + .downcast_ref::(), + ); + match (left, right) { + (Some(left), Some(right)) => { + let mut columns = left; + columns.extend(right); + Some(columns) + } + _ => None, + } + } + Operator::Eq => match Self::check_expr_is_col_equal_const(expr) { + Some((column, _)) => { + let mut columns = HashSet::new(); + columns.insert(column.name().to_string()); + Some(columns) + } + None => None, + }, + _ => None, + }, + } + } + + fn check_expr_is_col_equal_const( + exr: &phys_expr::BinaryExpr, + ) -> Option<(phys_expr::Column, ScalarValue)> { + if Operator::Eq.ne(exr.op()) { + return None; + } + + let left_any = exr.left().as_any(); + let right_any = exr.right().as_any(); + if let (Some(col), Some(liter)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.clone(), liter.value().clone())); + } + if let (Some(liter), Some(col)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.clone(), liter.value().clone())); + } + None + } +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { @@ -329,7 +571,12 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -358,7 +605,12 @@ mod tests { // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + Some(&pruning_predicate), + &metrics + ), vec![0, 1] ); } @@ -400,7 +652,12 @@ mod tests { // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); @@ -413,7 +670,12 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![0, 1] ); } @@ -456,7 +718,12 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -482,7 +749,12 @@ mod tests { // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -535,7 +807,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -598,7 +870,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3, rgm4], None, Some(&pruning_predicate), @@ -645,7 +917,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -715,7 +987,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -774,7 +1046,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), From cffb26e668daafad3d13a884f4a44aee3fb03833 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 17 Oct 2023 18:16:52 +0800 Subject: [PATCH 02/11] test: add unit test for read bloom filter --- .../src/datasource/physical_plan/parquet.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 183 ++++++++++++++++++ 2 files changed, 184 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 8502d060fed9..f5b907624d5a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -600,7 +600,7 @@ impl DefaultParquetFileReaderFactory { } /// Implements [`AsyncFileReader`] for a parquet file in object storage -struct ParquetFileReader { +pub(crate) struct ParquetFileReader { file_metrics: ParquetFileMetrics, inner: ParquetObjectReader, } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 7b6f11980f0d..9082871ce5af 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -488,6 +488,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { #[cfg(test)] mod tests { use super::*; + use crate::datasource::physical_plan::parquet::ParquetFileReader; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use arrow::datatypes::DataType::Decimal128; use arrow::datatypes::Schema; @@ -496,6 +497,7 @@ mod tests { use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; + use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::{ @@ -1118,4 +1120,185 @@ mod tests { let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).eq(lit("Hello_Not_Exists")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_partial_expr() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#) + .eq(lit("Hello_Not_Exists")) + .and(lit("1").eq(lit("1"))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#) + .eq(lit("Hello_Not_Exists")) + .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).eq(lit("Hello")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "alltypes_plain.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]); + let expr = col(r#""string_col""#).eq(lit("0")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + + async fn test_row_group_bloom_filter_pruning_predicate( + file_name: &str, + data: bytes::Bytes, + pruning_predicate: &PruningPredicate, + row_groups: &[usize], + ) -> Result> { + use object_store::{ObjectMeta, ObjectStore}; + + let object_meta = ObjectMeta { + location: object_store::path::Path::parse(file_name).expect("creating path"), + last_modified: chrono::DateTime::from(std::time::SystemTime::now()), + size: data.len(), + e_tag: None, + }; + let in_memory = object_store::memory::InMemory::new(); + in_memory + .put(&object_meta.location, data) + .await + .expect("put parquet file into in memory object store"); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = + ParquetFileMetrics::new(0, &object_meta.location.to_string(), &metrics); + let reader = ParquetFileReader { + inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), + file_metrics: file_metrics.clone(), + }; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); + + let metadata = builder.metadata().clone(); + let pruned_row_group = prune_row_groups_by_bloom_filters( + &mut builder, + row_groups, + &metadata.row_groups(), + pruning_predicate, + &file_metrics, + ) + .await; + + Ok(pruned_row_group) + } } From 0d59bb3323bcd61f9d21f85f4d5324663d9cb5fb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 19 Oct 2023 13:37:10 -0400 Subject: [PATCH 03/11] Simplify bloom filter application --- .../physical_plan/parquet/row_groups.rs | 87 ++++++------------- 1 file changed, 27 insertions(+), 60 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 9082871ce5af..1065e8f0dae4 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -19,6 +19,7 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Schema}, }; +use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; use parquet::{ arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, @@ -175,12 +176,15 @@ struct BloomFilterPruningPredicate { impl BloomFilterPruningPredicate { fn try_new(expr: &Arc) -> Result { - let expr = expr.as_any().downcast_ref::(); - match Self::get_predicate_columns(expr) { - Some(columns) => Ok(Self { - predicate_expr: expr.cloned(), - required_columns: columns.into_iter().collect(), - }), + let binary_expr = expr.as_any().downcast_ref::(); + match binary_expr { + Some(binary_expr) => { + let columns = Self::get_predicate_columns(expr); + Ok(Self { + predicate_expr: Some(binary_expr.clone()), + required_columns: columns.into_iter().collect(), + }) + } None => Err(DataFusionError::Execution( "BloomFilterPruningPredicate only support binary expr".to_string(), )), @@ -252,61 +256,24 @@ impl BloomFilterPruningPredicate { } } - fn get_predicate_columns( - expr: Option<&phys_expr::BinaryExpr>, - ) -> Option> { - match expr { - None => None, - Some(expr) => match expr.op() { - Operator::And => { - let left = Self::get_predicate_columns( - expr.left().as_any().downcast_ref::(), - ); - let right = Self::get_predicate_columns( - expr.right() - .as_any() - .downcast_ref::(), - ); - match (left, right) { - (Some(left), Some(right)) => { - let mut columns = left; - columns.extend(right); - Some(columns) - } - (Some(left), None) => Some(left), - (None, Some(right)) => Some(right), - _ => None, - } - } - Operator::Or => { - let left = Self::get_predicate_columns( - expr.left().as_any().downcast_ref::(), - ); - let right = Self::get_predicate_columns( - expr.right() - .as_any() - .downcast_ref::(), - ); - match (left, right) { - (Some(left), Some(right)) => { - let mut columns = left; - columns.extend(right); - Some(columns) - } - _ => None, - } + fn get_predicate_columns(expr: &Arc) -> HashSet { + let mut columns = HashSet::new(); + expr.apply(&mut |expr| { + if let Some(binary_expr) = + expr.as_any().downcast_ref::() + { + if let Some((column, _)) = + Self::check_expr_is_col_equal_const(binary_expr) + { + columns.insert(column.name().to_string()); } - Operator::Eq => match Self::check_expr_is_col_equal_const(expr) { - Some((column, _)) => { - let mut columns = HashSet::new(); - columns.insert(column.name().to_string()); - Some(columns) - } - None => None, - }, - _ => None, - }, - } + } + Ok(VisitRecursion::Continue) + }) + // no way to fail as only Ok(VisitRecursion::Continue) is returned + .unwrap(); + + columns } fn check_expr_is_col_equal_const( From 566423571b3010b1d0fa4a8792a23dd3213d993f Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Fri, 20 Oct 2023 15:39:56 +0800 Subject: [PATCH 04/11] test: add unit test for bloom filter with sql `in` --- .../physical_plan/parquet/row_groups.rs | 135 +++++++++++++++++- 1 file changed, 133 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 1065e8f0dae4..b537dbd58a57 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -460,10 +460,14 @@ mod tests { use arrow::datatypes::DataType::Decimal128; use arrow::datatypes::Schema; use arrow::datatypes::{DataType, Field}; - use datafusion_common::ToDFSchema; - use datafusion_expr::{cast, col, lit, Expr}; + use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema}; + use datafusion_expr::{ + builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF, + TableSource, WindowUDF, + }; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; + use datafusion_sql::planner::ContextProvider; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; @@ -1173,6 +1177,37 @@ mod tests { assert!(pruned_row_groups.is_empty()); } + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_sql_in() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![ + Field::new("String", DataType::Utf8, false), + Field::new("String3", DataType::Utf8, false), + ]); + let sql = + "SELECT * FROM tbl WHERE \"String\" IN ('Hello_Not_Exists', 'Hello_Not_Exists2')"; + let expr = sql_to_physical_plan(sql).unwrap(); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() { // load parquet file @@ -1268,4 +1303,100 @@ mod tests { Ok(pruned_row_group) } + + fn sql_to_physical_plan(sql: &str) -> Result> { + use datafusion_optimizer::{ + analyzer::Analyzer, optimizer::Optimizer, OptimizerConfig, OptimizerContext, + }; + use datafusion_sql::{ + planner::SqlToRel, + sqlparser::{ast::Statement, parser::Parser}, + }; + use sqlparser::dialect::GenericDialect; + + // parse the SQL + let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ... + let ast: Vec = Parser::parse_sql(&dialect, sql).unwrap(); + let statement = &ast[0]; + + // create a logical query plan + let schema_provider = TestSchemaProvider::new(); + let sql_to_rel = SqlToRel::new(&schema_provider); + let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); + + // hard code the return value of now() + let config = OptimizerContext::new().with_skip_failing_rules(false); + let analyzer = Analyzer::new(); + let optimizer = Optimizer::new(); + // analyze and optimize the logical plan + let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?; + let plan = optimizer.optimize(&plan, &config, |_, _| {})?; + // convert the logical plan into a physical plan + let exprs = plan.expressions(); + let expr = &exprs[0]; + let df_schema = plan.schema().as_ref().to_owned(); + let tb_schema: Schema = df_schema.into(); + let execution_props = ExecutionProps::new(); + create_physical_expr(expr, &df_schema, &tb_schema, &execution_props) + } + + struct TestSchemaProvider { + options: ConfigOptions, + tables: HashMap>, + } + + impl TestSchemaProvider { + pub fn new() -> Self { + let mut tables = HashMap::new(); + tables.insert( + "tbl".to_string(), + create_table_source(vec![Field::new( + "String".to_string(), + DataType::Utf8, + false, + )]), + ); + + Self { + options: Default::default(), + tables, + } + } + } + + impl ContextProvider for TestSchemaProvider { + fn get_table_provider( + &self, + name: TableReference, + ) -> Result> { + match self.tables.get(name.table()) { + Some(table) => Ok(table.clone()), + _ => datafusion_common::plan_err!("Table not found: {}", name.table()), + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } + + fn options(&self) -> &ConfigOptions { + &self.options + } + + fn get_window_meta(&self, _name: &str) -> Option> { + None + } + } + + fn create_table_source(fields: Vec) -> Arc { + Arc::new(LogicalTableSource::new(Arc::new(Schema::new(fields)))) + } } From d22eeaa197fd1d834d69e8bf0acdc8c87a5706f7 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Fri, 20 Oct 2023 15:52:26 +0800 Subject: [PATCH 05/11] fix: imrpove bloom filter match express --- .../physical_plan/parquet/row_groups.rs | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index b537dbd58a57..cdb822dacd0b 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -195,7 +195,7 @@ impl BloomFilterPruningPredicate { Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) } - /// filter the expr with bloom filter return true if the expr can be pruned + /// Return true if the expr can be pruned by the bloom filter fn prune_expr_with_bloom_filter( expr: Option<&phys_expr::BinaryExpr>, column_sbbf: &HashMap, @@ -205,7 +205,7 @@ impl BloomFilterPruningPredicate { } let expr = expr.unwrap(); match expr.op() { - Operator::And => { + Operator::And | Operator::Or => { let left = Self::prune_expr_with_bloom_filter( expr.left().as_any().downcast_ref::(), column_sbbf, @@ -216,20 +216,11 @@ impl BloomFilterPruningPredicate { .downcast_ref::(), column_sbbf, ); - left || right - } - Operator::Or => { - let left = Self::prune_expr_with_bloom_filter( - expr.left().as_any().downcast_ref::(), - column_sbbf, - ); - let right = Self::prune_expr_with_bloom_filter( - expr.right() - .as_any() - .downcast_ref::(), - column_sbbf, - ); - left && right + match expr.op() { + Operator::And => left || right, + Operator::Or => left && right, + _ => false, + } } Operator::Eq => { if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) { From 42c1d076afbd17cf1014bfa81e4a9886671afd9f Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Fri, 20 Oct 2023 16:48:26 +0800 Subject: [PATCH 06/11] fix: add more test for bloom filter --- .../src/datasource/physical_plan/parquet.rs | 2 +- .../physical_plan/parquet/row_groups.rs | 53 ++++++------------- .../sqllogictest/test_files/predicates.slt | 40 ++++++++++++++ 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index f5b907624d5a..f3f73ced5f25 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -248,7 +248,7 @@ impl ParquetExec { .unwrap_or(config_options.execution.parquet.enable_page_index) } - /// If enabled, the reader will read the bloom filter + /// If enabled, the reader will read by the bloom filter pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self { self.enable_bloom_filter = Some(enable_bloom_filter); self diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index cdb822dacd0b..afe4beef835e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -168,7 +168,7 @@ pub(crate) async fn prune_row_groups_by_bloom_filters< } struct BloomFilterPruningPredicate { - /// Actual pruning predicate (rewritten in terms of column min/max statistics) + /// Actual pruning predicate predicate_expr: Option, /// The statistics required to evaluate this predicate required_columns: Vec, @@ -195,15 +195,19 @@ impl BloomFilterPruningPredicate { Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) } - /// Return true if the expr can be pruned by the bloom filter + /// Return true if the `expr` can be proved not `true` + /// based on the bloom filter. + /// + /// We only checked `BinaryExpr` but it also support `InList`, + /// Because of the `optimizer` will convert `InList` to `BinaryExpr`. fn prune_expr_with_bloom_filter( expr: Option<&phys_expr::BinaryExpr>, column_sbbf: &HashMap, ) -> bool { - if expr.is_none() { + let Some(expr) = expr else { + // unsupported predicate return false; - } - let expr = expr.unwrap(); + }; match expr.op() { Operator::And | Operator::Or => { let left = Self::prune_expr_with_bloom_filter( @@ -1110,35 +1114,6 @@ mod tests { assert!(pruned_row_groups.is_empty()); } - #[tokio::test] - async fn test_row_group_bloom_filter_pruning_predicate_partial_expr() { - // load parquet file - let testdata = datafusion_common::test_util::parquet_test_data(); - let file_name = "data_index_bloom_encoding_stats.parquet"; - let path = format!("{testdata}/{file_name}"); - let data = bytes::Bytes::from(std::fs::read(path).unwrap()); - - // generate pruning predicate - let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#) - .eq(lit("Hello_Not_Exists")) - .and(lit("1").eq(lit("1"))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = - PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); - - let row_groups = vec![0]; - let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( - file_name, - data, - &pruning_predicate, - &row_groups, - ) - .await - .unwrap(); - assert!(pruned_row_groups.is_empty()); - } - #[tokio::test] async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() { // load parquet file @@ -1149,9 +1124,11 @@ mod tests { // generate pruning predicate let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); - let expr = col(r#""String""#) - .eq(lit("Hello_Not_Exists")) - .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))); + let expr = lit("1").eq(lit("1")).and( + col(r#""String""#) + .eq(lit("Hello_Not_Exists")) + .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))), + ); let expr = logical2physical(&expr, &schema); let pruning_predicate = PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); @@ -1326,7 +1303,7 @@ mod tests { let exprs = plan.expressions(); let expr = &exprs[0]; let df_schema = plan.schema().as_ref().to_owned(); - let tb_schema: Schema = df_schema.into(); + let tb_schema: Schema = df_schema.clone().into(); let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, &tb_schema, &execution_props) } diff --git a/datafusion/sqllogictest/test_files/predicates.slt b/datafusion/sqllogictest/test_files/predicates.slt index 937b4c2eccf6..d22b2ff953b7 100644 --- a/datafusion/sqllogictest/test_files/predicates.slt +++ b/datafusion/sqllogictest/test_files/predicates.slt @@ -480,3 +480,43 @@ select * from t where (i & 3) = 1; ######## statement ok DROP TABLE t; + + +######## +# Test query with bloom filter +# Refer to https://github.com/apache/arrow-datafusion/pull/7821#pullrequestreview-1688062599 +######## + +statement ok +CREATE EXTERNAL TABLE data_index_bloom_encoding_stats STORED AS PARQUET LOCATION '../../parquet-testing/data/data_index_bloom_encoding_stats.parquet'; + +statement ok +set datafusion.execution.parquet.bloom_filter_enabled=true; + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo'; + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test'; +---- +test + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" like '%e%'; +---- +Hello +test +are you +the quick +over +the lazy + +statement ok +set datafusion.execution.parquet.bloom_filter_enabled=false; + + +######## +# Clean up after the test +######## +statement ok +DROP TABLE data_index_bloom_encoding_stats; From ab29381dc1b8cae0a17e43b9165eba4241bfaf26 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 Oct 2023 07:46:16 +0800 Subject: [PATCH 07/11] ci: rollback dependences --- datafusion/sqllogictest/test_files/json.slt | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index a06565f42cdb..095642b562c9 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -60,8 +60,6 @@ AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] query ? SELECT mycol FROM single_nan ----- -NULL statement ok DROP TABLE json_test From a664a2e2bef11ed6ad113a5cc8b25211e24b5f45 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 Oct 2023 07:55:14 +0800 Subject: [PATCH 08/11] ci: merge main branch --- datafusion/sqllogictest/test_files/json.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index 095642b562c9..a06565f42cdb 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -60,6 +60,8 @@ AggregateExec: mode=Final, gby=[], aggr=[COUNT(*)] query ? SELECT mycol FROM single_nan +---- +NULL statement ok DROP TABLE json_test From 4ca9da7279ec633bf1f7469dcf9254be484a6245 Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 Oct 2023 08:10:37 +0800 Subject: [PATCH 09/11] fix: unit tests for bloom filter --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index afe4beef835e..0014f995d678 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1333,10 +1333,7 @@ mod tests { } impl ContextProvider for TestSchemaProvider { - fn get_table_provider( - &self, - name: TableReference, - ) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), _ => datafusion_common::plan_err!("Table not found: {}", name.table()), From 26824045a7e632d06bbfdb1088eb8ca779167f8f Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 Oct 2023 08:29:51 +0800 Subject: [PATCH 10/11] ci: cargo clippy --- .../src/datasource/physical_plan/parquet.rs | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index f3f73ced5f25..3a2459bec817 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -514,15 +514,17 @@ impl FileOpener for ParquetOpener { // Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups // using bloom filters on the row groups - if enable_bloom_filter && !row_groups.is_empty() && predicate.is_some() { - row_groups = row_groups::prune_row_groups_by_bloom_filters( - &mut builder, - &row_groups, - file_metadata.row_groups(), - predicate.unwrap(), - &file_metrics, - ) - .await; + if enable_bloom_filter && !row_groups.is_empty() { + if let Some(predicate) = predicate { + row_groups = row_groups::prune_row_groups_by_bloom_filters( + &mut builder, + &row_groups, + file_metadata.row_groups(), + predicate, + &file_metrics, + ) + .await; + } } // page index pruning: if all data on individual pages can From e4442fceb9e4200c3639697dfb0f800a35b28bec Mon Sep 17 00:00:00 2001 From: hengfeiyang Date: Tue, 24 Oct 2023 08:46:25 +0800 Subject: [PATCH 11/11] ci: cargo clippy --- .../core/src/datasource/physical_plan/parquet/row_groups.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0014f995d678..91bceed91602 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -1252,7 +1252,7 @@ mod tests { let metrics = ExecutionPlanMetricsSet::new(); let file_metrics = - ParquetFileMetrics::new(0, &object_meta.location.to_string(), &metrics); + ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics); let reader = ParquetFileReader { inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), file_metrics: file_metrics.clone(), @@ -1263,7 +1263,7 @@ mod tests { let pruned_row_group = prune_row_groups_by_bloom_filters( &mut builder, row_groups, - &metadata.row_groups(), + metadata.row_groups(), pruning_predicate, &file_metrics, )