Skip to content

Commit

Permalink
Optimize bulk writing of all blocks of bloom filter (#3340)
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Dec 13, 2022
1 parent 46b2848 commit 2749dcc
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
15 changes: 9 additions & 6 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::format::{
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::{BufWriter, Write};
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
Expand Down Expand Up @@ -220,18 +220,17 @@ impl Sbbf {
Self(data)
}

/// Write the bloom filter data (header and then bitset) to the output
pub(crate) fn write<W: Write>(&self, writer: W) -> Result<(), ParquetError> {
// Use a BufWriter to avoid costs of writing individual blocks
let mut writer = BufWriter::new(writer);
/// Write the bloom filter data (header and then bitset) to the output. This doesn't
/// flush the writer in order to boost performance of bulk writing all blocks. Caller
/// must remember to flush the writer.
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {}", e))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
writer.flush()?;
Ok(())
}

Expand Down Expand Up @@ -330,6 +329,10 @@ impl Sbbf {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
}

pub(crate) fn block_num(&self) -> usize {
self.0.len()
}
}

// per spec we use xxHash with seed=0
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use crate::bloom_filter::Sbbf;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use std::io::BufWriter;
use std::{io::Write, sync::Arc};
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable};

Expand Down Expand Up @@ -225,23 +226,27 @@ impl<W: Write> SerializedFileWriter<W> {
// iter row group
// iter each column
// write bloom filter to the file
let mut start_offset = self.buf.bytes_written();
let mut writer = BufWriter::new(&mut self.buf);

for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() {
match &self.bloom_filters[row_group_idx][column_idx] {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
bloom_filter.write(&mut self.buf)?;
bloom_filter.write(&mut writer)?;
// set offset and index for bloom filter
column_chunk
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
start_offset += bloom_filter.block_num() * 32;
}
None => {}
}
}
}
writer.flush()?;
Ok(())
}

Expand Down

0 comments on commit 2749dcc

Please sign in to comment.