Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split parquet bloom filter config and enable bloom filter on read by default #10306

Merged
merged 14 commits into from
May 2, 2024
8 changes: 6 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,11 @@ config_namespace! {
/// default parquet writer setting
pub encoding: Option<String>, default = None

/// Sets if bloom filter is enabled for any column
pub bloom_filter_enabled: bool, default = false
/// Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// Write bloom filters for all columns when creating parquet files
pub bloom_filter_on_write: bool, default = false

/// Sets bloom filter false positive probability. If NULL, uses
/// default parquet writer setting
Expand Down Expand Up @@ -1654,6 +1657,7 @@ config_namespace! {
}

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
CSV(CsvOptions),
JSON(JsonOptions),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
"format.data_page_row_count_limit".to_owned(),
"123".to_owned(),
);
option_map.insert("format.bloom_filter_enabled".to_owned(), "true".to_owned());
option_map.insert("format.bloom_filter_on_write".to_owned(), "true".to_owned());
option_map.insert("format.encoding".to_owned(), "plain".to_owned());
option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned());
option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned());
Expand Down
5 changes: 3 additions & 2 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
created_by,
column_index_truncate_length,
data_page_row_count_limit,
bloom_filter_enabled,
bloom_filter_on_write,
encoding,
dictionary_enabled,
compression,
Expand All @@ -80,6 +80,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
allow_single_file_parallelism: _,
maximum_parallel_row_group_writers: _,
maximum_buffered_record_batches_per_stream: _,
bloom_filter_on_read: _,
} = &parquet_options.global;

let key_value_metadata = if !parquet_options.key_value_metadata.is_empty() {
Expand All @@ -104,7 +105,7 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions {
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_data_page_row_count_limit(*data_page_row_count_limit)
.set_bloom_filter_enabled(*bloom_filter_enabled)
.set_bloom_filter_enabled(*bloom_filter_on_write)
.set_key_value_metadata(key_value_metadata);

if let Some(encoding) = &encoding {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ mod tests {
"50".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
Expand Down Expand Up @@ -1681,7 +1681,7 @@ mod tests {
"delta_binary_packed".into(),
);
config_map.insert(
"datafusion.execution.parquet.bloom_filter_enabled".into(),
"datafusion.execution.parquet.bloom_filter_on_write".into(),
"true".into(),
);
config_map.insert(
Expand Down
22 changes: 16 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,24 @@ impl ParquetExec {
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.table_parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self) -> bool {
self.table_parquet_options.global.bloom_filter_enabled
/// If enabled, the writer will write by the bloom filter
pub fn with_bloom_filter_on_write(
mut self,
enable_bloom_filter_on_write: bool,
) -> Self {
self.table_parquet_options.global.bloom_filter_on_write =
enable_bloom_filter_on_write;
self
}

/// Return the value described in [`Self::with_bloom_filter_on_read`]
fn bloom_filter_on_read(&self) -> bool {
self.table_parquet_options.global.bloom_filter_on_read
}

fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning {
Expand Down Expand Up @@ -407,7 +417,7 @@ impl ExecutionPlan for ParquetExec {
pushdown_filters: self.pushdown_filters(),
reorder_filters: self.reorder_filters(),
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.enable_bloom_filter(),
enable_bloom_filter: self.bloom_filter_on_read(),
};

let stream =
Expand Down
4 changes: 2 additions & 2 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,12 @@ impl SessionConfig {

/// Returns true if bloom filter should be used to skip parquet row groups
pub fn parquet_bloom_filter_pruning(&self) -> bool {
self.options.execution.parquet.bloom_filter_enabled
self.options.execution.parquet.bloom_filter_on_read
}

/// Enables or disables the use of bloom filter for parquet readers to skip row groups
pub fn with_parquet_bloom_filter_pruning(mut self, enabled: bool) -> Self {
self.options.execution.parquet.bloom_filter_enabled = enabled;
self.options.execution.parquet.bloom_filter_on_read = enabled;
self
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1215,10 +1215,12 @@ message ParquetOptions {
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
uint64 write_batch_size = 8; // default = 1024
string writer_version = 9; // default = "1.0"
bool bloom_filter_enabled = 20; // default = false
// bool bloom_filter_enabled = 20; // default = false
bool allow_single_file_parallelism = 23; // default = true
uint64 maximum_parallel_row_group_writers = 24; // default = 1
uint64 maximum_buffered_record_batches_per_stream = 25; // default = 2
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false

oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
Expand Down
54 changes: 36 additions & 18 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,8 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
protobuf::parquet_options::EncodingOpt::Encoding(v) => Some(v),
})
.unwrap_or(None),
bloom_filter_enabled: value.bloom_filter_enabled,
bloom_filter_on_read: value.bloom_filter_on_read,
bloom_filter_on_write: value.bloom_filter_on_write,
bloom_filter_fpp: value.clone()
.bloom_filter_fpp_opt
.map(|opt| match opt {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,8 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
column_index_truncate_length_opt: value.column_index_truncate_length.map(|v| protobuf::parquet_options::ColumnIndexTruncateLengthOpt::ColumnIndexTruncateLength(v as u64)),
data_page_row_count_limit: value.data_page_row_count_limit as u64,
encoding_opt: value.encoding.clone().map(protobuf::parquet_options::EncodingOpt::Encoding),
bloom_filter_enabled: value.bloom_filter_enabled,
bloom_filter_on_read: value.bloom_filter_on_read,
bloom_filter_on_write: value.bloom_filter_on_write,
bloom_filter_fpp_opt: value.bloom_filter_fpp.map(protobuf::parquet_options::BloomFilterFppOpt::BloomFilterFpp),
bloom_filter_ndv_opt: value.bloom_filter_ndv.map(protobuf::parquet_options::BloomFilterNdvOpt::BloomFilterNdv),
allow_single_file_parallelism: value.allow_single_file_parallelism,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
TableOptions::default_from_session_config(ctx.state().config_options());
let mut parquet_format = table_options.parquet;

parquet_format.global.bloom_filter_enabled = true;
parquet_format.global.bloom_filter_on_read = true;
parquet_format.global.created_by = "DataFusion Test".to_string();
parquet_format.global.writer_version = "PARQUET_2_0".to_string();
parquet_format.global.write_batch_size = 111;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ OPTIONS (
'format.created_by' 'DF copy.slt',
'format.column_index_truncate_length' 123,
'format.data_page_row_count_limit' 1234,
'format.bloom_filter_enabled' true,
'format.bloom_filter_on_read' true,
'format.bloom_filter_enabled::col1' false,
'format.bloom_filter_fpp::col2' 0.456,
'format.bloom_filter_ndv::col2' 456,
Expand Down
Loading