diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 6cab27b0846c..3a2459bec817 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -82,6 +82,9 @@ pub struct ParquetExec { /// Override for `Self::with_enable_page_index`. If None, uses /// values from base_config enable_page_index: Option, + /// Override for `Self::with_enable_bloom_filter`. If None, uses + /// values from base_config + enable_bloom_filter: Option, /// Base configuration for this scan base_config: FileScanConfig, projected_statistics: Statistics, @@ -151,6 +154,7 @@ impl ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, + enable_bloom_filter: None, base_config, projected_schema, projected_statistics, @@ -244,6 +248,18 @@ impl ParquetExec { .unwrap_or(config_options.execution.parquet.enable_page_index) } + /// If enabled, the reader will read by the bloom filter + pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self { + self.enable_bloom_filter = Some(enable_bloom_filter); + self + } + + /// Return the value described in [`Self::with_enable_bloom_filter`] + fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool { + self.enable_bloom_filter + .unwrap_or(config_options.execution.parquet.bloom_filter_enabled) + } + /// Redistribute files across partitions according to their size /// See comments on `get_file_groups_repartitioned()` for more detail. pub fn get_repartitioned( @@ -373,6 +389,7 @@ impl ExecutionPlan for ParquetExec { pushdown_filters: self.pushdown_filters(config_options), reorder_filters: self.reorder_filters(config_options), enable_page_index: self.enable_page_index(config_options), + enable_bloom_filter: self.enable_bloom_filter(config_options), }; let stream = @@ -406,6 +423,7 @@ struct ParquetOpener { pushdown_filters: bool, reorder_filters: bool, enable_page_index: bool, + enable_bloom_filter: bool, } impl FileOpener for ParquetOpener { @@ -440,6 +458,7 @@ impl FileOpener for ParquetOpener { self.enable_page_index, &self.page_pruning_predicate, ); + let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; Ok(Box::pin(async move { @@ -482,16 +501,32 @@ impl FileOpener for ParquetOpener { }; }; - // Row group pruning: attempt to skip entire row_groups + // Row group pruning by statistics: attempt to skip entire row_groups // using metadata on the row groups - let file_metadata = builder.metadata(); - let row_groups = row_groups::prune_row_groups( + let file_metadata = builder.metadata().clone(); + let predicate = pruning_predicate.as_ref().map(|p| p.as_ref()); + let mut row_groups = row_groups::prune_row_groups_by_statistics( file_metadata.row_groups(), file_range, - pruning_predicate.as_ref().map(|p| p.as_ref()), + predicate, &file_metrics, ); + // Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups + // using bloom filters on the row groups + if enable_bloom_filter && !row_groups.is_empty() { + if let Some(predicate) = predicate { + row_groups = row_groups::prune_row_groups_by_bloom_filters( + &mut builder, + &row_groups, + file_metadata.row_groups(), + predicate, + &file_metrics, + ) + .await; + } + } + // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well @@ -567,7 +602,7 @@ impl DefaultParquetFileReaderFactory { } /// Implements [`AsyncFileReader`] for a parquet file in object storage -struct ParquetFileReader { +pub(crate) struct ParquetFileReader { file_metrics: ParquetFileMetrics, inner: ParquetObjectReader, } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index c6e2c68d0211..91bceed91602 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -19,24 +19,31 @@ use arrow::{ array::ArrayRef, datatypes::{DataType, Schema}, }; -use datafusion_common::Column; -use datafusion_common::ScalarValue; -use log::debug; - -use parquet::file::{ - metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, +use datafusion_common::tree_node::{TreeNode, VisitRecursion}; +use datafusion_common::{Column, DataFusionError, Result, ScalarValue}; +use parquet::{ + arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder}, + bloom_filter::Sbbf, + file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, }; - -use crate::datasource::physical_plan::parquet::{ - from_bytes_to_i128, parquet_to_arrow_decimal_type, +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, }; -use crate::{ - datasource::listing::FileRange, - physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, + +use crate::datasource::{ + listing::FileRange, + physical_plan::parquet::{from_bytes_to_i128, parquet_to_arrow_decimal_type}, }; +use crate::logical_expr::Operator; +use crate::physical_expr::expressions as phys_expr; +use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use crate::physical_plan::PhysicalExpr; use super::ParquetFileMetrics; +/// Prune row groups based on statistics +/// /// Returns a vector of indexes into `groups` which should be scanned. /// /// If an index is NOT present in the returned Vec it means the @@ -44,7 +51,7 @@ use super::ParquetFileMetrics; /// /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. -pub(crate) fn prune_row_groups( +pub(crate) fn prune_row_groups_by_statistics( groups: &[RowGroupMetaData], range: Option, predicate: Option<&PruningPredicate>, @@ -81,7 +88,7 @@ pub(crate) fn prune_row_groups( // stats filter array could not be built // return a closure which will not filter out any row groups Err(e) => { - debug!("Error evaluating row group predicate values {e}"); + log::debug!("Error evaluating row group predicate values {e}"); metrics.predicate_evaluation_errors.add(1); } } @@ -92,6 +99,203 @@ pub(crate) fn prune_row_groups( filtered } +/// Prune row groups by bloom filters +/// +/// Returns a vector of indexes into `groups` which should be scanned. +/// +/// If an index is NOT present in the returned Vec it means the +/// predicate filtered all the row group. +/// +/// If an index IS present in the returned Vec it means the predicate +/// did not filter out that row group. +pub(crate) async fn prune_row_groups_by_bloom_filters< + T: AsyncFileReader + Send + 'static, +>( + builder: &mut ParquetRecordBatchStreamBuilder, + row_groups: &[usize], + groups: &[RowGroupMetaData], + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, +) -> Vec { + let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr()) + { + Ok(predicates) => predicates, + Err(_) => { + return row_groups.to_vec(); + } + }; + let mut filtered = Vec::with_capacity(groups.len()); + for idx in row_groups { + let rg_metadata = &groups[*idx]; + // get all columns bloom filter + let mut column_sbbf = + HashMap::with_capacity(bf_predicates.required_columns.len()); + for column_name in bf_predicates.required_columns.iter() { + let column_idx = match rg_metadata + .columns() + .iter() + .enumerate() + .find(|(_, column)| column.column_path().string().eq(column_name)) + { + Some((column_idx, _)) => column_idx, + None => continue, + }; + let bf = match builder + .get_row_group_column_bloom_filter(*idx, column_idx) + .await + { + Ok(bf) => match bf { + Some(bf) => bf, + None => { + continue; + } + }, + Err(e) => { + log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}"); + metrics.predicate_evaluation_errors.add(1); + continue; + } + }; + column_sbbf.insert(column_name.to_owned(), bf); + } + if bf_predicates.prune(&column_sbbf) { + metrics.row_groups_pruned.add(1); + continue; + } + filtered.push(*idx); + } + filtered +} + +struct BloomFilterPruningPredicate { + /// Actual pruning predicate + predicate_expr: Option, + /// The statistics required to evaluate this predicate + required_columns: Vec, +} + +impl BloomFilterPruningPredicate { + fn try_new(expr: &Arc) -> Result { + let binary_expr = expr.as_any().downcast_ref::(); + match binary_expr { + Some(binary_expr) => { + let columns = Self::get_predicate_columns(expr); + Ok(Self { + predicate_expr: Some(binary_expr.clone()), + required_columns: columns.into_iter().collect(), + }) + } + None => Err(DataFusionError::Execution( + "BloomFilterPruningPredicate only support binary expr".to_string(), + )), + } + } + + fn prune(&self, column_sbbf: &HashMap) -> bool { + Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf) + } + + /// Return true if the `expr` can be proved not `true` + /// based on the bloom filter. + /// + /// We only checked `BinaryExpr` but it also support `InList`, + /// Because of the `optimizer` will convert `InList` to `BinaryExpr`. + fn prune_expr_with_bloom_filter( + expr: Option<&phys_expr::BinaryExpr>, + column_sbbf: &HashMap, + ) -> bool { + let Some(expr) = expr else { + // unsupported predicate + return false; + }; + match expr.op() { + Operator::And | Operator::Or => { + let left = Self::prune_expr_with_bloom_filter( + expr.left().as_any().downcast_ref::(), + column_sbbf, + ); + let right = Self::prune_expr_with_bloom_filter( + expr.right() + .as_any() + .downcast_ref::(), + column_sbbf, + ); + match expr.op() { + Operator::And => left || right, + Operator::Or => left && right, + _ => false, + } + } + Operator::Eq => { + if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) { + if let Some(sbbf) = column_sbbf.get(col.name()) { + match val { + ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()), + ScalarValue::Boolean(Some(v)) => !sbbf.check(&v), + ScalarValue::Float64(Some(v)) => !sbbf.check(&v), + ScalarValue::Float32(Some(v)) => !sbbf.check(&v), + ScalarValue::Int64(Some(v)) => !sbbf.check(&v), + ScalarValue::Int32(Some(v)) => !sbbf.check(&v), + ScalarValue::Int16(Some(v)) => !sbbf.check(&v), + ScalarValue::Int8(Some(v)) => !sbbf.check(&v), + _ => false, + } + } else { + false + } + } else { + false + } + } + _ => false, + } + } + + fn get_predicate_columns(expr: &Arc) -> HashSet { + let mut columns = HashSet::new(); + expr.apply(&mut |expr| { + if let Some(binary_expr) = + expr.as_any().downcast_ref::() + { + if let Some((column, _)) = + Self::check_expr_is_col_equal_const(binary_expr) + { + columns.insert(column.name().to_string()); + } + } + Ok(VisitRecursion::Continue) + }) + // no way to fail as only Ok(VisitRecursion::Continue) is returned + .unwrap(); + + columns + } + + fn check_expr_is_col_equal_const( + exr: &phys_expr::BinaryExpr, + ) -> Option<(phys_expr::Column, ScalarValue)> { + if Operator::Eq.ne(exr.op()) { + return None; + } + + let left_any = exr.left().as_any(); + let right_any = exr.right().as_any(); + if let (Some(col), Some(liter)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.clone(), liter.value().clone())); + } + if let (Some(liter), Some(col)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.clone(), liter.value().clone())); + } + None + } +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { @@ -246,14 +450,20 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { #[cfg(test)] mod tests { use super::*; + use crate::datasource::physical_plan::parquet::ParquetFileReader; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use arrow::datatypes::DataType::Decimal128; use arrow::datatypes::Schema; use arrow::datatypes::{DataType, Field}; - use datafusion_common::ToDFSchema; - use datafusion_expr::{cast, col, lit, Expr}; + use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema}; + use datafusion_expr::{ + builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF, + TableSource, WindowUDF, + }; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; + use datafusion_sql::planner::ContextProvider; + use parquet::arrow::async_reader::ParquetObjectReader; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::{ @@ -329,7 +539,12 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -358,7 +573,12 @@ mod tests { // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + Some(&pruning_predicate), + &metrics + ), vec![0, 1] ); } @@ -400,7 +620,12 @@ mod tests { // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); @@ -413,7 +638,12 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![0, 1] ); } @@ -456,7 +686,12 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -482,7 +717,12 @@ mod tests { // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &groups, + None, + Some(&pruning_predicate), + &metrics + ), vec![1] ); } @@ -535,7 +775,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -598,7 +838,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3, rgm4], None, Some(&pruning_predicate), @@ -645,7 +885,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -715,7 +955,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -774,7 +1014,7 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, Some(&pruning_predicate), @@ -846,4 +1086,282 @@ mod tests { let execution_props = ExecutionProps::new(); create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap() } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).eq(lit("Hello_Not_Exists")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = lit("1").eq(lit("1")).and( + col(r#""String""#) + .eq(lit("Hello_Not_Exists")) + .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))), + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_sql_in() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![ + Field::new("String", DataType::Utf8, false), + Field::new("String3", DataType::Utf8, false), + ]); + let sql = + "SELECT * FROM tbl WHERE \"String\" IN ('Hello_Not_Exists', 'Hello_Not_Exists2')"; + let expr = sql_to_physical_plan(sql).unwrap(); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert!(pruned_row_groups.is_empty()); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "data_index_bloom_encoding_stats.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]); + let expr = col(r#""String""#).eq(lit("Hello")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + + #[tokio::test] + async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() { + // load parquet file + let testdata = datafusion_common::test_util::parquet_test_data(); + let file_name = "alltypes_plain.parquet"; + let path = format!("{testdata}/{file_name}"); + let data = bytes::Bytes::from(std::fs::read(path).unwrap()); + + // generate pruning predicate + let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]); + let expr = col(r#""string_col""#).eq(lit("0")); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = + PruningPredicate::try_new(expr, Arc::new(schema)).unwrap(); + + let row_groups = vec![0]; + let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate( + file_name, + data, + &pruning_predicate, + &row_groups, + ) + .await + .unwrap(); + assert_eq!(pruned_row_groups, row_groups); + } + + async fn test_row_group_bloom_filter_pruning_predicate( + file_name: &str, + data: bytes::Bytes, + pruning_predicate: &PruningPredicate, + row_groups: &[usize], + ) -> Result> { + use object_store::{ObjectMeta, ObjectStore}; + + let object_meta = ObjectMeta { + location: object_store::path::Path::parse(file_name).expect("creating path"), + last_modified: chrono::DateTime::from(std::time::SystemTime::now()), + size: data.len(), + e_tag: None, + }; + let in_memory = object_store::memory::InMemory::new(); + in_memory + .put(&object_meta.location, data) + .await + .expect("put parquet file into in memory object store"); + + let metrics = ExecutionPlanMetricsSet::new(); + let file_metrics = + ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics); + let reader = ParquetFileReader { + inner: ParquetObjectReader::new(Arc::new(in_memory), object_meta), + file_metrics: file_metrics.clone(), + }; + let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); + + let metadata = builder.metadata().clone(); + let pruned_row_group = prune_row_groups_by_bloom_filters( + &mut builder, + row_groups, + metadata.row_groups(), + pruning_predicate, + &file_metrics, + ) + .await; + + Ok(pruned_row_group) + } + + fn sql_to_physical_plan(sql: &str) -> Result> { + use datafusion_optimizer::{ + analyzer::Analyzer, optimizer::Optimizer, OptimizerConfig, OptimizerContext, + }; + use datafusion_sql::{ + planner::SqlToRel, + sqlparser::{ast::Statement, parser::Parser}, + }; + use sqlparser::dialect::GenericDialect; + + // parse the SQL + let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ... + let ast: Vec = Parser::parse_sql(&dialect, sql).unwrap(); + let statement = &ast[0]; + + // create a logical query plan + let schema_provider = TestSchemaProvider::new(); + let sql_to_rel = SqlToRel::new(&schema_provider); + let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); + + // hard code the return value of now() + let config = OptimizerContext::new().with_skip_failing_rules(false); + let analyzer = Analyzer::new(); + let optimizer = Optimizer::new(); + // analyze and optimize the logical plan + let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?; + let plan = optimizer.optimize(&plan, &config, |_, _| {})?; + // convert the logical plan into a physical plan + let exprs = plan.expressions(); + let expr = &exprs[0]; + let df_schema = plan.schema().as_ref().to_owned(); + let tb_schema: Schema = df_schema.clone().into(); + let execution_props = ExecutionProps::new(); + create_physical_expr(expr, &df_schema, &tb_schema, &execution_props) + } + + struct TestSchemaProvider { + options: ConfigOptions, + tables: HashMap>, + } + + impl TestSchemaProvider { + pub fn new() -> Self { + let mut tables = HashMap::new(); + tables.insert( + "tbl".to_string(), + create_table_source(vec![Field::new( + "String".to_string(), + DataType::Utf8, + false, + )]), + ); + + Self { + options: Default::default(), + tables, + } + } + } + + impl ContextProvider for TestSchemaProvider { + fn get_table_source(&self, name: TableReference) -> Result> { + match self.tables.get(name.table()) { + Some(table) => Ok(table.clone()), + _ => datafusion_common::plan_err!("Table not found: {}", name.table()), + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } + + fn options(&self) -> &ConfigOptions { + &self.options + } + + fn get_window_meta(&self, _name: &str) -> Option> { + None + } + } + + fn create_table_source(fields: Vec) -> Arc { + Arc::new(LogicalTableSource::new(Arc::new(Schema::new(fields)))) + } } diff --git a/datafusion/sqllogictest/test_files/predicates.slt b/datafusion/sqllogictest/test_files/predicates.slt index 937b4c2eccf6..d22b2ff953b7 100644 --- a/datafusion/sqllogictest/test_files/predicates.slt +++ b/datafusion/sqllogictest/test_files/predicates.slt @@ -480,3 +480,43 @@ select * from t where (i & 3) = 1; ######## statement ok DROP TABLE t; + + +######## +# Test query with bloom filter +# Refer to https://github.com/apache/arrow-datafusion/pull/7821#pullrequestreview-1688062599 +######## + +statement ok +CREATE EXTERNAL TABLE data_index_bloom_encoding_stats STORED AS PARQUET LOCATION '../../parquet-testing/data/data_index_bloom_encoding_stats.parquet'; + +statement ok +set datafusion.execution.parquet.bloom_filter_enabled=true; + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo'; + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test'; +---- +test + +query T +SELECT * FROM data_index_bloom_encoding_stats WHERE "String" like '%e%'; +---- +Hello +test +are you +the quick +over +the lazy + +statement ok +set datafusion.execution.parquet.bloom_filter_enabled=false; + + +######## +# Clean up after the test +######## +statement ok +DROP TABLE data_index_bloom_encoding_stats;