Skip to content

Commit

Permalink
Revert "Revert "Write Bloom filters between row groups instead of the…
Browse files Browse the repository at this point in the history
… end (#…"

This reverts commit 22e0b44.
  • Loading branch information
alamb authored Jun 21, 2024
1 parent 22e0b44 commit 5f78af7
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 52 deletions.
8 changes: 8 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.30.12", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -114,12 +115,19 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]

[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"

[[example]]
name = "write_parquet"
required-features = ["cli", "sysinfo"]
path = "./examples/write_parquet.rs"

[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
Expand Down
131 changes: 131 additions & 0 deletions parquet/examples/write_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fs::File;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use arrow::array::{StructArray, UInt64Builder};
use arrow::datatypes::DataType::UInt64;
use arrow::datatypes::{Field, Schema};
use clap::{Parser, ValueEnum};
use parquet::arrow::ArrowWriter as ParquetWriter;
use parquet::basic::Encoding;
use parquet::errors::Result;
use parquet::file::properties::{BloomFilterPosition, WriterProperties};
use sysinfo::{MemoryRefreshKind, Pid, ProcessRefreshKind, RefreshKind, System};

#[derive(ValueEnum, Clone)]
enum BloomFilterPositionArg {
End,
AfterRowGroup,
}

#[derive(Parser)]
#[command(version)]
/// Writes sequences of integers, with a Bloom Filter, while logging timing and memory usage.
struct Args {
#[arg(long, default_value_t = 1000)]
/// Number of batches to write
iterations: u64,

#[arg(long, default_value_t = 1000000)]
/// Number of rows in each batch
batch: u64,

#[arg(long, value_enum, default_value_t=BloomFilterPositionArg::AfterRowGroup)]
/// Where to write Bloom Filters
bloom_filter_position: BloomFilterPositionArg,

/// Path to the file to write
path: PathBuf,
}

fn now() -> String {
chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string()
}

fn mem(system: &mut System) -> String {
let pid = Pid::from(std::process::id() as usize);
system.refresh_process_specifics(pid, ProcessRefreshKind::new().with_memory());
system
.process(pid)
.map(|proc| format!("{}MB", proc.memory() / 1_000_000))
.unwrap_or("N/A".to_string())
}

fn main() -> Result<()> {
let args = Args::parse();

let bloom_filter_position = match args.bloom_filter_position {
BloomFilterPositionArg::End => BloomFilterPosition::End,
BloomFilterPositionArg::AfterRowGroup => BloomFilterPosition::AfterRowGroup,
};

let properties = WriterProperties::builder()
.set_column_bloom_filter_enabled("id".into(), true)
.set_column_encoding("id".into(), Encoding::DELTA_BINARY_PACKED)
.set_bloom_filter_position(bloom_filter_position)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("id", UInt64, false)]));
// Create parquet file that will be read.
let file = File::create(args.path).unwrap();
let mut writer = ParquetWriter::try_new(file, schema.clone(), Some(properties))?;

let mut system =
System::new_with_specifics(RefreshKind::new().with_memory(MemoryRefreshKind::everything()));
eprintln!(
"{} Writing {} batches of {} rows. RSS = {}",
now(),
args.iterations,
args.batch,
mem(&mut system)
);

let mut array_builder = UInt64Builder::new();
let mut last_log = Instant::now();
for i in 0..args.iterations {
if Instant::now() - last_log > Duration::new(10, 0) {
last_log = Instant::now();
eprintln!(
"{} Iteration {}/{}. RSS = {}",
now(),
i + 1,
args.iterations,
mem(&mut system)
);
}
for j in 0..args.batch {
array_builder.append_value(i + j);
}
writer.write(
&StructArray::new(
schema.fields().clone(),
vec![Arc::new(array_builder.finish())],
None,
)
.into(),
)?;
}
writer.flush()?;
writer.close()?;

eprintln!("{} Done. RSS = {}", now(), mem(&mut system));

Ok(())
}
28 changes: 25 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::column::writer::{
};
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr};
use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<W: Write + Send> ArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.writer.flushed_row_groups()
}

Expand Down Expand Up @@ -1053,7 +1053,9 @@ mod tests {
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_pages_locations;
use crate::file::properties::{EnabledStatistics, ReaderProperties, WriterVersion};
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion,
};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::file::{
reader::{FileReader, SerializedFileReader},
Expand Down Expand Up @@ -1701,6 +1703,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
bloom_filter_position: BloomFilterPosition,
}

impl RoundTripOptions {
Expand All @@ -1711,6 +1714,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
}
Expand All @@ -1730,6 +1734,7 @@ mod tests {
values,
schema,
bloom_filter,
bloom_filter_position,
} = options;

let encodings = match values.data_type() {
Expand Down Expand Up @@ -1770,6 +1775,7 @@ mod tests {
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
.set_bloom_filter_position(bloom_filter_position)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down Expand Up @@ -2127,6 +2133,22 @@ mod tests {
values_required::<BinaryViewArray, _>(many_vecs_iter);
}

#[test]
fn i32_column_bloom_filter_at_end() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;
options.bloom_filter_position = BloomFilterPosition::End;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
(0..SMALL_SIZE as i32).collect(),
(SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
);
}

#[test]
fn i32_column_bloom_filter() {
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use crate::{
arrow::arrow_writer::ArrowWriterOptions,
arrow::ArrowWriter,
errors::{ParquetError, Result},
file::{metadata::RowGroupMetaDataPtr, properties::WriterProperties},
file::{metadata::RowGroupMetaData, properties::WriterProperties},
format::{FileMetaData, KeyValue},
};
use arrow_array::RecordBatch;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl<W: AsyncFileWriter> AsyncArrowWriter<W> {
}

/// Returns metadata for any flushed row groups
pub fn flushed_row_groups(&self) -> &[RowGroupMetaDataPtr] {
pub fn flushed_row_groups(&self) -> &[RowGroupMetaData] {
self.sync_writer.flushed_row_groups()
}

Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ impl RowGroupMetaData {
&self.columns
}

/// Returns mutable slice of column chunk metadata.
pub fn columns_mut(&mut self) -> &mut [ColumnChunkMetaData] {
&mut self.columns
}

/// Number of rows in this row group.
pub fn num_rows(&self) -> i64 {
self.num_rows
Expand Down
36 changes: 36 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Pag
pub const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::bloom_filter_position`]
pub const DEFAULT_BLOOM_FILTER_POSITION: BloomFilterPosition = BloomFilterPosition::AfterRowGroup;
/// Default value for [`WriterProperties::created_by`]
pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// Default value for [`WriterProperties::column_index_truncate_length`]
Expand Down Expand Up @@ -86,6 +88,24 @@ impl FromStr for WriterVersion {
}
}

/// Where in the file [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter) should
/// write Bloom filters
///
/// Basic constant, which is not part of the Thrift definition.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BloomFilterPosition {
/// Write Bloom Filters of each row group right after the row group
///
/// This saves memory by writing it as soon as it is computed, at the cost
/// of data locality for readers
AfterRowGroup,
/// Write Bloom Filters at the end of the file
///
/// This allows better data locality for readers, at the cost of memory usage
/// for writers.
End,
}

/// Reference counted writer properties.
pub type WriterPropertiesPtr = Arc<WriterProperties>;

Expand Down Expand Up @@ -130,6 +150,7 @@ pub struct WriterProperties {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
Expand Down Expand Up @@ -217,6 +238,11 @@ impl WriterProperties {
self.max_row_group_size
}

/// Returns maximum number of rows in a row group.
pub fn bloom_filter_position(&self) -> BloomFilterPosition {
self.bloom_filter_position
}

/// Returns configured writer version.
pub fn writer_version(&self) -> WriterVersion {
self.writer_version
Expand Down Expand Up @@ -338,6 +364,7 @@ pub struct WriterPropertiesBuilder {
data_page_row_count_limit: usize,
write_batch_size: usize,
max_row_group_size: usize,
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
key_value_metadata: Option<Vec<KeyValue>>,
Expand All @@ -357,6 +384,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: usize::MAX,
write_batch_size: DEFAULT_WRITE_BATCH_SIZE,
max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE,
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
key_value_metadata: None,
Expand All @@ -376,6 +404,7 @@ impl WriterPropertiesBuilder {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
max_row_group_size: self.max_row_group_size,
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
key_value_metadata: self.key_value_metadata,
Expand Down Expand Up @@ -487,6 +516,12 @@ impl WriterPropertiesBuilder {
self
}

/// Sets where in the final file Bloom Filters are written (default `AfterRowGroup`)
pub fn set_bloom_filter_position(mut self, value: BloomFilterPosition) -> Self {
self.bloom_filter_position = value;
self
}

/// Sets "created by" property (defaults to `parquet-rs version <VERSION>`).
pub fn set_created_by(mut self, value: String) -> Self {
self.created_by = value;
Expand Down Expand Up @@ -1052,6 +1087,7 @@ mod tests {
);
assert_eq!(props.write_batch_size(), DEFAULT_WRITE_BATCH_SIZE);
assert_eq!(props.max_row_group_size(), DEFAULT_MAX_ROW_GROUP_SIZE);
assert_eq!(props.bloom_filter_position(), DEFAULT_BLOOM_FILTER_POSITION);
assert_eq!(props.writer_version(), DEFAULT_WRITER_VERSION);
assert_eq!(props.created_by(), DEFAULT_CREATED_BY);
assert_eq!(props.key_value_metadata(), None);
Expand Down
Loading

0 comments on commit 5f78af7

Please sign in to comment.