diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ecc2f444ab7f..558e801e8bb4 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -229,6 +229,8 @@ config_namespace! { /// to reduce the number of rows decoded. pub enable_page_index: bool, default = false + pub enable_bloom_fiter:bool, default = false + /// If true, the parquet reader attempts to skip entire row groups based /// on the predicate in the query and the metadata (min/max values) stored in /// the parquet file diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 80b72e68f6ea..a96429c5989b 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -208,7 +208,7 @@ impl PruningPredicate { } /// Returns true if this pruning predicate is "always true" (aka will not prune anything) - pub fn allways_true(&self) -> bool { + pub fn always_true(&self) -> bool { is_always_true(&self.predicate_expr) } diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 2c1bfc9caa51..8dc17b478a14 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -90,6 +90,10 @@ pub struct ParquetExec { /// Override for `Self::with_enable_page_index`. If None, uses /// values from base_config enable_page_index: Option, + + /// Override for `Self::with_enable_bloom_filter`. If None, uses + /// values from base_config + enable_bloom_filter: Option, /// Base configuraton for this scan base_config: FileScanConfig, projected_statistics: Statistics, @@ -135,7 +139,7 @@ impl ParquetExec { } } }) - .filter(|p| !p.allways_true()); + .filter(|p| !p.always_true()); let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| { match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) { @@ -157,6 +161,7 @@ impl ParquetExec { pushdown_filters: None, reorder_filters: None, enable_page_index: None, + enable_bloom_filter: None, base_config, projected_schema, projected_statistics, @@ -245,6 +250,20 @@ impl ParquetExec { .unwrap_or(config_options.execution.parquet.enable_page_index) } + /// If enabled, the reader will read the bloom filter + /// This is used to pruning row groups by + /// eliminating unnecessary IO and decoding + pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self { + self.enable_bloom_filter = Some(enable_bloom_filter); + self + } + + /// Return the value described in [`Self::with_enable_bloom_filter`] + fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool { + self.enable_bloom_filter + .unwrap_or(config_options.execution.parquet.enable_bloom_fiter) + } + /// Redistribute files across partitions according to their size pub fn get_repartitioned( &self, @@ -389,6 +408,7 @@ impl ExecutionPlan for ParquetExec { pushdown_filters: self.pushdown_filters(config_options), reorder_filters: self.reorder_filters(config_options), enable_page_index: self.enable_page_index(config_options), + enable_bloom_filter: self.enable_bloom_filter(config_options), }; let stream = @@ -474,6 +494,7 @@ struct ParquetOpener { pushdown_filters: bool, reorder_filters: bool, enable_page_index: bool, + enable_bloom_filter: bool, } impl FileOpener for ParquetOpener { @@ -504,6 +525,7 @@ impl FileOpener for ParquetOpener { let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; let enable_page_index = self.enable_page_index; + let enable_bloom_filter = self.enable_bloom_filter; let limit = self.limit; Ok(Box::pin(async move { @@ -545,23 +567,44 @@ impl FileOpener for ParquetOpener { }; }; - // Row group pruning: attempt to skip entire row_groups - // using metadata on the row groups - let file_metadata = builder.metadata(); - let row_groups = row_groups::prune_row_groups( - file_metadata.row_groups(), - file_range, - pruning_predicate.as_ref().map(|p| p.as_ref()), - &file_metrics, - ); + let file_metadata = builder.metadata().clone(); + let mut row_groups_index: Vec = + (0..file_metadata.num_row_groups()).collect(); + + if let Some(pruning_predicate) = pruning_predicate.as_ref() { + // Row group pruning: attempt to skip entire row_groups + // using min max metadata on the row groups + row_groups_index = row_groups::prune_row_groups_by_statistics( + file_metadata.row_groups(), + file_range.as_ref(), + pruning_predicate.as_ref(), + &file_metrics, + ); + + // Row group pruning: attempt to skip entire row_groups + // using bloom filter metadata on the row groups + if enable_bloom_filter { + row_groups_index = row_groups::prune_row_groups_by_bloom_filter( + reader.clone(), + &row_groups_index, + file_metadata.row_groups(), + file_range.as_ref(), + pruning_predicate.as_ref(), + &file_metrics, + ); + } + } // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if enable_page_index && !row_groups.is_empty() { + if enable_page_index && !row_groups_index.is_empty() { if let Some(p) = page_pruning_predicate { - let pruned = - p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?; + let pruned = p.prune( + &row_groups_index, + file_metadata.as_ref(), + &file_metrics, + )?; if let Some(row_selection) = pruned { builder = builder.with_row_selection(row_selection); } @@ -575,7 +618,7 @@ impl FileOpener for ParquetOpener { let stream = builder .with_projection(mask) .with_batch_size(batch_size) - .with_row_groups(row_groups) + .with_row_groups(row_groups_index) .build()?; let adapted = stream diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs index 376ae35c66d9..5819f22674c4 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs @@ -15,14 +15,25 @@ // specific language governing permissions and limitations // under the License. +use arrow::datatypes::SchemaRef; use arrow::{ array::ArrayRef, datatypes::{DataType, Schema}, }; use datafusion_common::Column; use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, Result}; use log::debug; - +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::bloom_filter::Sbbf; +use std::num::TryFromIntError; +use std::sync::Arc; + +use datafusion_common::DataFusionError::ArrowError; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions as phys_expr; +use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; +use parquet::file::metadata::ColumnChunkMetaData; use parquet::file::{ metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, }; @@ -44,41 +55,39 @@ use super::ParquetFileMetrics; /// /// If an index IS present in the returned Vec it means the predicate /// did not filter out that row group. -pub(crate) fn prune_row_groups( +pub(crate) fn prune_row_groups_by_statistics( groups: &[RowGroupMetaData], - range: Option, - predicate: Option<&PruningPredicate>, + range: Option<&FileRange>, + predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) -> Vec { let mut filtered = Vec::with_capacity(groups.len()); for (idx, metadata) in groups.iter().enumerate() { - if let Some(range) = &range { + if let Some(range) = range { let offset = metadata.column(0).file_offset(); if offset < range.start || offset >= range.end { continue; } } - if let Some(predicate) = predicate { - let pruning_stats = RowGroupPruningStatistics { - row_group_metadata: metadata, - parquet_schema: predicate.schema().as_ref(), - }; - match predicate.prune(&pruning_stats) { - Ok(values) => { - // NB: false means don't scan row group - if !values[0] { - metrics.row_groups_pruned.add(1); - continue; - } - } - // stats filter array could not be built - // return a closure which will not filter out any row groups - Err(e) => { - debug!("Error evaluating row group predicate values {e}"); - metrics.predicate_evaluation_errors.add(1); + let pruning_stats = RowGroupPruningStatistics { + row_group_metadata: metadata, + parquet_schema: predicate.schema().as_ref(), + }; + match predicate.prune(&pruning_stats) { + Ok(values) => { + // NB: false means don't scan row group + if !values[0] { + metrics.row_groups_pruned.add(1); + continue; } } + // stats filter array could not be built + // return a closure which will not filter out any row groups + Err(e) => { + debug!("Error evaluating row group predicate values {e}"); + metrics.predicate_evaluation_errors.add(1); + } } filtered.push(idx) @@ -86,6 +95,86 @@ pub(crate) fn prune_row_groups( filtered } +pub(crate) fn prune_row_groups_by_bloom_filter( + reader: Box, + row_groups_index: &[usize], + groups: &[RowGroupMetaData], + range: Option<&FileRange>, + predicate: &PruningPredicate, + metrics: &ParquetFileMetrics, +) -> Vec { + // All row groups are filtered return fast. + if row_groups_index.is_empty() { + return row_groups_index.into(); + } + + let bloom_filter_predicates = match BloomFilterPruningPredicate::try_new( + predicate.orig_expr(), + ) { + Ok(predicate) => predicate, + Err(e) => { + debug!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}"); + } + }; + + let mut filtered = Vec::with_capacity(groups.len()); + + for rg_index in row_groups_index { + let rg_meta = groups + .get(*rg_index) + .expect("RowGroupMetaData out of index"); + let filtered = false; + for (filter_col, filter_val) in bloom_filter_predicates.predicates { + let col_meta = rg_meta.column(filter_col.index()); + read_bloom_filter_from_column_chunk(col_meta, reader.clone()) + reader + } + } + filtered +} + +fn read_bloom_filter_from_column_chunk( + column_metadata: &ColumnChunkMetaData, + reader: Box, +) -> Option { + let offset = if let Some(offset) = column_metadata.bloom_filter_offset() { + match u64::try_from(offset) { + Ok(v) => v, + Err(e) => { + debug!("Bloom filter offset is invalid {e}"); + return None; + } + } + } else { + return None; + }; + + let (header, bitset_offset) = + chunk_read_bloom_filter_header_and_offset(offset, reader.clone())?; + + match header.algorithm { + BloomFilterAlgorithm::BLOCK(_) => { + // this match exists to future proof the singleton algorithm enum + } + } + match header.compression { + BloomFilterCompression::UNCOMPRESSED(_) => { + // this match exists to future proof the singleton compression enum + } + } + match header.hash { + BloomFilterHash::XXHASH(_) => { + // this match exists to future proof the singleton hash enum + } + } + // length in bytes + let length: usize = header.num_bytes.try_into().map_err(|_| { + ParquetError::General("Bloom filter length is invalid".to_string()) + })?; + let bitset = reader.get_bytes(bitset_offset, length)?; + Some(Sbbf::new(&bitset)) +} + /// Wraps parquet statistics in a way /// that implements [`PruningStatistics`] struct RowGroupPruningStatistics<'a> { @@ -236,6 +325,49 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { get_null_count_values!(self, column) } } +#[derive(Debug)] +pub(crate) struct BloomFilterPruningPredicate { + // Only predicates like `col = ` can be applied to bloom filters + predicates: Vec<(phys_expr::Column, ScalarValue)>, +} + +impl BloomFilterPruningPredicate { + pub fn try_new(expr: &Arc) -> Result { + let mut predicates = vec![]; + for x in split_conjunction(expr) { + if let Some(bin_expr) = x.as_any().downcast_ref::() { + if let Some(res) = check_expr_is_col_equal_const(bin_expr) { + predicates.push(res) + } + } + } + Ok(Self { predicates }) + } +} + +fn check_expr_is_col_equal_const( + exr: &phys_expr::BinaryExpr, +) -> Option<(phys_expr::Column, ScalarValue)> { + if exr.op() == Operator::Eq { + let left_any = exr.left().as_any(); + let right_any = exr.right().as_any(); + + if let (Some(col), Some(liter)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.into(), liter.value().into())); + } + + if let (Some(liter), Some(col)) = ( + left_any.downcast_ref::(), + right_any.downcast_ref::(), + ) { + return Some((col.into(), liter.value().into())); + } + } + return None; +} #[cfg(test)] mod tests { @@ -285,7 +417,12 @@ mod tests { let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + &pruning_predicate, + &metrics + ), vec![1] ); } @@ -320,7 +457,12 @@ mod tests { // missing statistics for first row group mean that the result from the predicate expression // is null / undefined so the first row group can't be filtered out assert_eq!( - prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics( + &[rgm1, rgm2], + None, + &pruning_predicate, + &metrics + ), vec![0, 1] ); } @@ -364,7 +506,7 @@ mod tests { // the first row group is still filtered out because the predicate expression can be partially evaluated // when conditions are joined using AND assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics(groups, None, &pruning_predicate, &metrics), vec![1] ); @@ -379,7 +521,7 @@ mod tests { // if conditions in predicate are joined with OR and an unsupported expression is used // this bypasses the entire predicate expression and no row groups are filtered out assert_eq!( - prune_row_groups(groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics(groups, None, &pruning_predicate, &metrics), vec![0, 1] ); } @@ -422,7 +564,7 @@ mod tests { let metrics = parquet_file_metrics(); // First row group was filtered out because it contains no null value on "c2". assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics(&groups, None, &pruning_predicate, &metrics), vec![1] ); } @@ -448,7 +590,7 @@ mod tests { // bool = NULL always evaluates to NULL (and thus will not // pass predicates. Ideally these should both be false assert_eq!( - prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics), + prune_row_groups_by_statistics(&groups, None, &pruning_predicate, &metrics), vec![1] ); } @@ -504,10 +646,10 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, - Some(&pruning_predicate), + &pruning_predicate, &metrics ), vec![0, 2] @@ -569,10 +711,10 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3, rgm4], None, - Some(&pruning_predicate), + &pruning_predicate, &metrics ), vec![0, 1, 3] @@ -619,10 +761,10 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, - Some(&pruning_predicate), + &pruning_predicate, &metrics ), vec![1, 2] @@ -691,10 +833,10 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, - Some(&pruning_predicate), + &pruning_predicate, &metrics ), vec![1, 2] @@ -752,10 +894,10 @@ mod tests { ); let metrics = parquet_file_metrics(); assert_eq!( - prune_row_groups( + prune_row_groups_by_statistics( &[rgm1, rgm2, rgm3], None, - Some(&pruning_predicate), + &pruning_predicate, &metrics ), vec![1, 2]