Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support using Bloom Filter in parquet reader #5569

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ config_namespace! {
/// to reduce the number of rows decoded.
pub enable_page_index: bool, default = false

pub enable_bloom_fiter:bool, default = false

/// If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl PruningPredicate {
}

/// Returns true if this pruning predicate is "always true" (aka will not prune anything)
pub fn allways_true(&self) -> bool {
pub fn always_true(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo fix

is_always_true(&self.predicate_expr)
}

Expand Down
71 changes: 57 additions & 14 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ pub struct ParquetExec {
/// Override for `Self::with_enable_page_index`. If None, uses
/// values from base_config
enable_page_index: Option<bool>,

/// Override for `Self::with_enable_bloom_filter`. If None, uses
/// values from base_config
enable_bloom_filter: Option<bool>,
/// Base configuraton for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
Expand Down Expand Up @@ -135,7 +139,7 @@ impl ParquetExec {
}
}
})
.filter(|p| !p.allways_true());
.filter(|p| !p.always_true());

let page_pruning_predicate = predicate.as_ref().and_then(|predicate_expr| {
match PagePruningPredicate::try_new(predicate_expr, file_schema.clone()) {
Expand All @@ -157,6 +161,7 @@ impl ParquetExec {
pushdown_filters: None,
reorder_filters: None,
enable_page_index: None,
enable_bloom_filter: None,
base_config,
projected_schema,
projected_statistics,
Expand Down Expand Up @@ -245,6 +250,20 @@ impl ParquetExec {
.unwrap_or(config_options.execution.parquet.enable_page_index)
}

/// If enabled, the reader will read the bloom filter
/// This is used to pruning row groups by
/// eliminating unnecessary IO and decoding
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.enable_bloom_filter = Some(enable_bloom_filter);
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool {
self.enable_bloom_filter
.unwrap_or(config_options.execution.parquet.enable_bloom_fiter)
}

/// Redistribute files across partitions according to their size
pub fn get_repartitioned(
&self,
Expand Down Expand Up @@ -389,6 +408,7 @@ impl ExecutionPlan for ParquetExec {
pushdown_filters: self.pushdown_filters(config_options),
reorder_filters: self.reorder_filters(config_options),
enable_page_index: self.enable_page_index(config_options),
enable_bloom_filter: self.enable_bloom_filter(config_options),
};

let stream =
Expand Down Expand Up @@ -474,6 +494,7 @@ struct ParquetOpener {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -504,6 +525,7 @@ impl FileOpener for ParquetOpener {
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let enable_page_index = self.enable_page_index;
let enable_bloom_filter = self.enable_bloom_filter;
let limit = self.limit;

Ok(Box::pin(async move {
Expand Down Expand Up @@ -545,23 +567,44 @@ impl FileOpener for ParquetOpener {
};
};

// Row group pruning: attempt to skip entire row_groups
// using metadata on the row groups
let file_metadata = builder.metadata();
let row_groups = row_groups::prune_row_groups(
file_metadata.row_groups(),
file_range,
pruning_predicate.as_ref().map(|p| p.as_ref()),
&file_metrics,
);
let file_metadata = builder.metadata().clone();
let mut row_groups_index: Vec<usize> =
(0..file_metadata.num_row_groups()).collect();

if let Some(pruning_predicate) = pruning_predicate.as_ref() {
// Row group pruning: attempt to skip entire row_groups
// using min max metadata on the row groups
row_groups_index = row_groups::prune_row_groups_by_statistics(
file_metadata.row_groups(),
file_range.as_ref(),
pruning_predicate.as_ref(),
&file_metrics,
);

// Row group pruning: attempt to skip entire row_groups
// using bloom filter metadata on the row groups
if enable_bloom_filter {
row_groups_index = row_groups::prune_row_groups_by_bloom_filter(
reader.clone(),
&row_groups_index,
file_metadata.row_groups(),
file_range.as_ref(),
pruning_predicate.as_ref(),
&file_metrics,
);
}
}

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if enable_page_index && !row_groups.is_empty() {
if enable_page_index && !row_groups_index.is_empty() {
if let Some(p) = page_pruning_predicate {
let pruned =
p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
let pruned = p.prune(
&row_groups_index,
file_metadata.as_ref(),
&file_metrics,
)?;
if let Some(row_selection) = pruned {
builder = builder.with_row_selection(row_selection);
}
Expand All @@ -575,7 +618,7 @@ impl FileOpener for ParquetOpener {
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
.with_row_groups(row_groups)
.with_row_groups(row_groups_index)
.build()?;

let adapted = stream
Expand Down
Loading