Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use bloom filter when reading parquet to skip row groups #7821

Merged
merged 11 commits into from
Oct 25, 2023
45 changes: 40 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct ParquetExec {
/// Override for `Self::with_enable_page_index`. If None, uses
/// values from base_config
enable_page_index: Option<bool>,
/// Override for `Self::with_enable_bloom_filter`. If None, uses
/// values from base_config
enable_bloom_filter: Option<bool>,
alamb marked this conversation as resolved.
Show resolved Hide resolved
/// Base configuration for this scan
base_config: FileScanConfig,
projected_statistics: Statistics,
Expand Down Expand Up @@ -151,6 +154,7 @@ impl ParquetExec {
pushdown_filters: None,
reorder_filters: None,
enable_page_index: None,
enable_bloom_filter: None,
base_config,
projected_schema,
projected_statistics,
Expand Down Expand Up @@ -244,6 +248,18 @@ impl ParquetExec {
.unwrap_or(config_options.execution.parquet.enable_page_index)
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.enable_bloom_filter = Some(enable_bloom_filter);
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self, config_options: &ConfigOptions) -> bool {
self.enable_bloom_filter
.unwrap_or(config_options.execution.parquet.bloom_filter_enabled)
hengfeiyang marked this conversation as resolved.
Show resolved Hide resolved
}

/// Redistribute files across partitions according to their size
/// See comments on `get_file_groups_repartitioned()` for more detail.
pub fn get_repartitioned(
Expand Down Expand Up @@ -373,6 +389,7 @@ impl ExecutionPlan for ParquetExec {
pushdown_filters: self.pushdown_filters(config_options),
reorder_filters: self.reorder_filters(config_options),
enable_page_index: self.enable_page_index(config_options),
enable_bloom_filter: self.enable_bloom_filter(config_options),
};

let stream =
Expand Down Expand Up @@ -406,6 +423,7 @@ struct ParquetOpener {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
enable_bloom_filter: bool,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -440,6 +458,7 @@ impl FileOpener for ParquetOpener {
self.enable_page_index,
&self.page_pruning_predicate,
);
let enable_bloom_filter = self.enable_bloom_filter;
let limit = self.limit;

Ok(Box::pin(async move {
Expand Down Expand Up @@ -482,16 +501,32 @@ impl FileOpener for ParquetOpener {
};
};

// Row group pruning: attempt to skip entire row_groups
// Row group pruning by statistics: attempt to skip entire row_groups
// using metadata on the row groups
let file_metadata = builder.metadata();
let row_groups = row_groups::prune_row_groups(
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
file_metadata.row_groups(),
file_range,
pruning_predicate.as_ref().map(|p| p.as_ref()),
predicate,
&file_metrics,
);

// Bloom filter pruning: if bloom filters are enabled and then attempt to skip entire row_groups
// using bloom filters on the row groups
if enable_bloom_filter && !row_groups.is_empty() {
if let Some(predicate) = predicate {
row_groups = row_groups::prune_row_groups_by_bloom_filters(
&mut builder,
&row_groups,
file_metadata.row_groups(),
predicate,
&file_metrics,
)
.await;
}
}

// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
Expand Down Expand Up @@ -567,7 +602,7 @@ impl DefaultParquetFileReaderFactory {
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
struct ParquetFileReader {
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
}
Expand Down
Loading