diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 6b02da186091..1ee4fa0771a1 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -39,6 +39,7 @@ flate2 = { version = "1.0", optional = true } lz4 = { version = "1.23", optional = true } zstd = { version = "0.10", optional = true } chrono = { version = "0.4", default-features = false } +num = "0.4" num-bigint = "0.4" arrow = { path = "../arrow", version = "9.0.0", optional = true, default-features = false, features = ["ipc"] } base64 = { version = "0.13", optional = true } diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 1bd6fc61a37b..c976ad425413 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -17,15 +17,19 @@ use arrow::array::Array; use arrow::datatypes::DataType; -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::measurement::WallTime; +use criterion::{criterion_group, criterion_main, BenchmarkGroup, Criterion}; +use num::FromPrimitive; +use parquet::basic::Type; use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; use parquet::{ arrow::array_reader::ArrayReader, basic::Encoding, column::page::PageIterator, - data_type::{ByteArrayType, Int32Type}, + data_type::{ByteArrayType, Int32Type, Int64Type}, schema::types::{ColumnDescPtr, SchemaDescPtr}, }; +use rand::distributions::uniform::SampleUniform; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::{collections::VecDeque, sync::Arc}; @@ -37,6 +41,8 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL INT32 optional_int32_leaf; REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8); OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8); + REQUIRED INT64 mandatory_int64_leaf; + OPTIONAL INT64 optional_int64_leaf; } "; parse_message_type(message_type) @@ -49,22 +55,27 @@ const NUM_ROW_GROUPS: usize = 1; const PAGES_PER_GROUP: usize = 2; const VALUES_PER_PAGE: usize = 10_000; const BATCH_SIZE: usize = 8192; +const EXPECTED_VALUE_COUNT: usize = NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE; pub fn seedable_rng() -> StdRng { StdRng::seed_from_u64(42) } -fn build_plain_encoded_int32_page_iterator( +fn build_encoded_primitive_page_iterator( schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32, -) -> impl PageIterator + Clone { + encoding: Encoding, +) -> impl PageIterator + Clone +where + T: parquet::data_type::DataType, + T::T: SampleUniform + FromPrimitive, +{ let max_def_level = column_desc.max_def_level(); let max_rep_level = column_desc.max_rep_level(); let rep_levels = vec![0; VALUES_PER_PAGE]; let mut rng = seedable_rng(); let mut pages: Vec> = Vec::new(); - let mut int32_value = 0; for _i in 0..NUM_ROW_GROUPS { let mut column_chunk_pages = Vec::new(); for _j in 0..PAGES_PER_GROUP { @@ -78,8 +89,9 @@ fn build_plain_encoded_int32_page_iterator( max_def_level }; if def_level == max_def_level { - int32_value += 1; - values.push(int32_value); + let value = + FromPrimitive::from_usize(rng.gen_range(0..1000)).unwrap(); + values.push(value); } def_levels.push(def_level); } @@ -87,7 +99,7 @@ fn build_plain_encoded_int32_page_iterator( DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); page_builder.add_rep_levels(max_rep_level, &rep_levels); page_builder.add_def_levels(max_def_level, &def_levels); - page_builder.add_values::(Encoding::PLAIN, &values); + page_builder.add_values::(encoding, &values); column_chunk_pages.push(page_builder.consume()); } pages.push(column_chunk_pages); @@ -96,27 +108,30 @@ fn build_plain_encoded_int32_page_iterator( InMemoryPageIterator::new(schema, column_desc, pages) } -fn build_dictionary_encoded_int32_page_iterator( +fn build_dictionary_encoded_primitive_page_iterator( schema: SchemaDescPtr, column_desc: ColumnDescPtr, null_density: f32, -) -> impl PageIterator + Clone { +) -> impl PageIterator + Clone +where + T: parquet::data_type::DataType, + T::T: SampleUniform + FromPrimitive + Copy, +{ use parquet::encoding::{DictEncoder, Encoder}; let max_def_level = column_desc.max_def_level(); let max_rep_level = column_desc.max_rep_level(); let rep_levels = vec![0; VALUES_PER_PAGE]; // generate 1% unique values const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100; - let unique_values = (0..NUM_UNIQUE_VALUES) - .map(|x| (x + 1) as i32) + let unique_values: Vec = (0..NUM_UNIQUE_VALUES) + .map(|x| FromPrimitive::from_usize(x + 1).unwrap()) .collect::>(); let mut rng = seedable_rng(); let mut pages: Vec> = Vec::new(); for _i in 0..NUM_ROW_GROUPS { let mut column_chunk_pages = VecDeque::new(); let mem_tracker = Arc::new(parquet::memory::MemTracker::new()); - let mut dict_encoder = - DictEncoder::::new(column_desc.clone(), mem_tracker); + let mut dict_encoder = DictEncoder::::new(column_desc.clone(), mem_tracker); // add data pages for _j in 0..PAGES_PER_GROUP { // generate page @@ -130,8 +145,8 @@ fn build_dictionary_encoded_int32_page_iterator( }; if def_level == max_def_level { // select random value from list of unique values - let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)]; - values.push(int32_value); + let value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)]; + values.push(value); } def_levels.push(def_level); } @@ -288,19 +303,34 @@ fn bench_array_reader(mut array_reader: Box) -> usize { total_count } -fn create_int32_primitive_array_reader( +fn create_primitive_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { use parquet::arrow::array_reader::PrimitiveArrayReader; - let reader = PrimitiveArrayReader::::new_with_options( - Box::new(page_iterator), - column_desc, - None, - true, - ) - .unwrap(); - Box::new(reader) + match column_desc.physical_type() { + Type::INT32 => { + let reader = PrimitiveArrayReader::::new_with_options( + Box::new(page_iterator), + column_desc, + None, + true, + ) + .unwrap(); + Box::new(reader) + } + Type::INT64 => { + let reader = PrimitiveArrayReader::::new_with_options( + Box::new(page_iterator), + column_desc, + None, + true, + ) + .unwrap(); + Box::new(reader) + } + _ => unreachable!(), + } } fn create_string_byte_array_reader( @@ -332,9 +362,7 @@ fn create_complex_object_byte_array_dictionary_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, ) -> Box { - use parquet::arrow::array_reader::{ - make_byte_array_dictionary_reader, ComplexObjectArrayReader, - }; + use parquet::arrow::array_reader::ComplexObjectArrayReader; use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; let arrow_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); @@ -351,162 +379,224 @@ fn create_complex_object_byte_array_dictionary_reader( ) } -fn add_benches(c: &mut Criterion) { - const EXPECTED_VALUE_COUNT: usize = - NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE; - let mut group = c.benchmark_group("arrow_array_reader"); - +fn bench_primitive( + group: &mut BenchmarkGroup, + schema: &SchemaDescPtr, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, +) where + T: parquet::data_type::DataType, + T::T: SampleUniform + FromPrimitive + Copy, +{ let mut count: usize = 0; - let schema = build_test_schema(); - let mandatory_int32_column_desc = schema.column(0); - let optional_int32_column_desc = schema.column(1); - let mandatory_string_column_desc = schema.column(2); - let optional_string_column_desc = schema.column(3); - // primitive / int32 benchmarks - // ============================= - - // int32, plain encoded, no NULLs - let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( + // plain encoded, no NULLs + let data = build_encoded_primitive_page_iterator::( schema.clone(), - mandatory_int32_column_desc.clone(), + mandatory_column_desc.clone(), 0.0, + Encoding::PLAIN, ); - group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs", |b| { + group.bench_function("plain encoded, mandatory, no NULLs", |b| { b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - plain_int32_no_null_data.clone(), - mandatory_int32_column_desc.clone(), + let array_reader = create_primitive_array_reader( + data.clone(), + mandatory_column_desc.clone(), ); count = bench_array_reader(array_reader); }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); - let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator( + let data = build_encoded_primitive_page_iterator::( schema.clone(), - optional_int32_column_desc.clone(), + optional_column_desc.clone(), 0.0, + Encoding::PLAIN, ); - group.bench_function("read Int32Array, plain encoded, optional, no NULLs", |b| { + group.bench_function("plain encoded, optional, no NULLs", |b| { b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - plain_int32_no_null_data.clone(), - optional_int32_column_desc.clone(), - ); + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); count = bench_array_reader(array_reader); }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); - // int32, plain encoded, half NULLs - let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator( + // plain encoded, half NULLs + let data = build_encoded_primitive_page_iterator::( schema.clone(), - optional_int32_column_desc.clone(), + optional_column_desc.clone(), 0.5, + Encoding::PLAIN, ); - group.bench_function( - "read Int32Array, plain encoded, optional, half NULLs", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - plain_int32_half_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, + group.bench_function("plain encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary packed, no NULLs + let data = build_encoded_primitive_page_iterator::( + schema.clone(), + mandatory_column_desc.clone(), + 0.0, + Encoding::DELTA_BINARY_PACKED, ); + group.bench_function("binary packed, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_primitive_array_reader( + data.clone(), + mandatory_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - // int32, dictionary encoded, no NULLs - let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( + let data = build_encoded_primitive_page_iterator::( schema.clone(), - mandatory_int32_column_desc.clone(), + optional_column_desc.clone(), 0.0, + Encoding::DELTA_BINARY_PACKED, ); - group.bench_function( - "read Int32Array, dictionary encoded, mandatory, no NULLs", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - dictionary_int32_no_null_data.clone(), - mandatory_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, + group.bench_function("binary packed, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary packed, half NULLs + let data = build_encoded_primitive_page_iterator::( + schema.clone(), + optional_column_desc.clone(), + 0.5, + Encoding::DELTA_BINARY_PACKED, ); + group.bench_function("binary packed, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator( + // dictionary encoded, no NULLs + let data = build_dictionary_encoded_primitive_page_iterator::( schema.clone(), - optional_int32_column_desc.clone(), + mandatory_column_desc.clone(), 0.0, ); - group.bench_function( - "read Int32Array, dictionary encoded, optional, no NULLs", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - dictionary_int32_no_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_primitive_array_reader( + data.clone(), + mandatory_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_dictionary_encoded_primitive_page_iterator::( + schema.clone(), + optional_column_desc.clone(), + 0.0, ); + group.bench_function("dictionary encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - // int32, dictionary encoded, half NULLs - let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator( + // dictionary encoded, half NULLs + let data = build_dictionary_encoded_primitive_page_iterator::( schema.clone(), - optional_int32_column_desc.clone(), + optional_column_desc.clone(), 0.5, ); - group.bench_function( - "read Int32Array, dictionary encoded, optional, half NULLs", - |b| { - b.iter(|| { - let array_reader = create_int32_primitive_array_reader( - dictionary_int32_half_null_data.clone(), - optional_int32_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, + group.bench_function("dictionary encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); +} + +fn add_benches(c: &mut Criterion) { + let mut count: usize = 0; + + let schema = build_test_schema(); + let mandatory_int32_column_desc = schema.column(0); + let optional_int32_column_desc = schema.column(1); + let mandatory_string_column_desc = schema.column(2); + let optional_string_column_desc = schema.column(3); + let mandatory_int64_column_desc = schema.column(4); + let optional_int64_column_desc = schema.column(5); + // primitive / int32 benchmarks + // ============================= + + let mut group = c.benchmark_group("arrow_array_reader/Int32Array"); + bench_primitive::( + &mut group, + &schema, + &mandatory_int32_column_desc, + &optional_int32_column_desc, + ); + group.finish(); + + // primitive / int64 benchmarks + // ============================= + + let mut group = c.benchmark_group("arrow_array_reader/Int64Array"); + bench_primitive::( + &mut group, + &schema, + &mandatory_int64_column_desc, + &optional_int64_column_desc, ); + group.finish(); // string benchmarks //============================== + let mut group = c.benchmark_group("arrow_array_reader/StringArray"); + // string, plain encoded, no NULLs let plain_string_no_null_data = build_plain_encoded_string_page_iterator( schema.clone(), mandatory_string_column_desc.clone(), 0.0, ); - group.bench_function( - "read StringArray, plain encoded, mandatory, no NULLs", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - plain_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("plain encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + plain_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); let plain_string_no_null_data = build_plain_encoded_string_page_iterator( schema.clone(), optional_string_column_desc.clone(), 0.0, ); - group.bench_function("read StringArray, plain encoded, optional, no NULLs", |b| { + group.bench_function("plain encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_string_byte_array_reader( plain_string_no_null_data.clone(), @@ -523,19 +613,16 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), 0.5, ); - group.bench_function( - "read StringArray, plain encoded, optional, half NULLs", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - plain_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("plain encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + plain_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); // string, dictionary encoded, no NULLs let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( @@ -543,38 +630,32 @@ fn add_benches(c: &mut Criterion) { mandatory_string_column_desc.clone(), 0.0, ); - group.bench_function( - "read StringArray, dictionary encoded, mandatory, no NULLs", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator( schema.clone(), optional_string_column_desc.clone(), 0.0, ); - group.bench_function( - "read StringArray, dictionary encoded, optional, no NULLs", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); // string, dictionary encoded, half NULLs let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator( @@ -582,103 +663,89 @@ fn add_benches(c: &mut Criterion) { optional_string_column_desc.clone(), 0.5, ); - group.bench_function( - "read StringArray, dictionary encoded, optional, half NULLs", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - group.bench_function( - "read StringDictionary, dictionary encoded, mandatory, no NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.finish(); - group.bench_function( - "read StringDictionary, dictionary encoded, mandatory, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - mandatory_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + // string dictionary benchmarks + //============================== - group.bench_function( - "read StringDictionary, dictionary encoded, optional, no NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + let mut group = c.benchmark_group("arrow_array_reader/StringDictionary"); - group.bench_function( - "read StringDictionary, dictionary encoded, optional, no NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_no_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - group.bench_function( - "read StringDictionary, dictionary encoded, optional, half NULLs - old", - |b| { - b.iter(|| { - let array_reader = create_complex_object_byte_array_dictionary_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); - group.bench_function( - "read StringDictionary, dictionary encoded, optional, half NULLs - new", - |b| { - b.iter(|| { - let array_reader = create_string_byte_array_dictionary_reader( - dictionary_string_half_null_data.clone(), - optional_string_column_desc.clone(), - ); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }, - ); + group.bench_function("dictionary encoded, optional, no NULLs - old", |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + group.bench_function("dictionary encoded, optional, no NULLs - new", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_no_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + group.bench_function("dictionary encoded, optional, half NULLs - old", |b| { + b.iter(|| { + let array_reader = create_complex_object_byte_array_dictionary_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + group.bench_function("dictionary encoded, optional, half NULLs - new", |b| { + b.iter(|| { + let array_reader = create_string_byte_array_dictionary_reader( + dictionary_string_half_null_data.clone(), + optional_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); group.finish(); } diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index f044dd244d25..bd0ae34b1ac7 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -17,6 +17,8 @@ //! Contains all supported decoders for Parquet. +use num::traits::WrappingAdd; +use num::FromPrimitive; use std::{cmp, marker::PhantomData, mem}; use super::rle::RleDecoder; @@ -27,8 +29,8 @@ use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::{ - bit_util::{self, BitReader, FromBytes}, - memory::{ByteBuffer, ByteBufferPtr}, + bit_util::{self, BitReader}, + memory::ByteBufferPtr, }; pub(crate) mod private { @@ -431,189 +433,249 @@ pub struct DeltaBitPackDecoder { initialized: bool, // Header info - num_values: usize, - num_mini_blocks: i64, + + /// The number of values in each block + block_size: usize, + /// The number of values that remain to be read in the current page + values_left: usize, + /// The number of mini-blocks in each block + mini_blocks_per_block: usize, + /// The number of values in each mini block values_per_mini_block: usize, - values_current_mini_block: usize, - first_value: i64, - first_value_read: bool, // Per block info - min_delta: i64, - mini_block_idx: usize, - delta_bit_width: u8, - delta_bit_widths: ByteBuffer, - deltas_in_mini_block: Vec, // eagerly loaded deltas for a mini block - use_batch: bool, - - current_value: i64, - _phantom: PhantomData, + /// The minimum delta in the block + min_delta: T::T, + /// The byte offset of the end of the current block + block_end_offset: usize, + /// The index on the current mini block + mini_block_idx: usize, + /// The bit widths of each mini block in the current block + mini_block_bit_widths: Vec, + /// The number of values remaining in the current mini block + mini_block_remaining: usize, + + /// The first value from the block header if not consumed + first_value: Option, + /// The last value to compute offsets from + last_value: T::T, } -impl DeltaBitPackDecoder { +impl DeltaBitPackDecoder +where + T::T: Default + FromPrimitive + WrappingAdd + Copy, +{ /// Creates new delta bit packed decoder. pub fn new() -> Self { Self { bit_reader: BitReader::from(vec![]), initialized: false, - num_values: 0, - num_mini_blocks: 0, + block_size: 0, + values_left: 0, + mini_blocks_per_block: 0, values_per_mini_block: 0, - values_current_mini_block: 0, - first_value: 0, - first_value_read: false, - min_delta: 0, + min_delta: Default::default(), mini_block_idx: 0, - delta_bit_width: 0, - delta_bit_widths: ByteBuffer::new(), - deltas_in_mini_block: vec![], - use_batch: mem::size_of::() == 4, - current_value: 0, - _phantom: PhantomData, + mini_block_bit_widths: vec![], + mini_block_remaining: 0, + block_end_offset: 0, + first_value: None, + last_value: Default::default(), } } - /// Returns underlying bit reader offset. + /// Returns the current offset pub fn get_offset(&self) -> usize { assert!(self.initialized, "Bit reader is not initialized"); - self.bit_reader.get_byte_offset() + match self.values_left { + // If we've exhausted this page report the end of the current block + // as we may not have consumed the trailing padding + // + // The max is necessary to handle pages which don't contain more than + // one value and therefore have no blocks, but still contain a page header + 0 => self.bit_reader.get_byte_offset().max(self.block_end_offset), + _ => self.bit_reader.get_byte_offset(), + } } - /// Initializes new mini block. + /// Initializes the next block and the first mini block within it #[inline] - fn init_block(&mut self) -> Result<()> { - self.min_delta = self + fn next_block(&mut self) -> Result<()> { + let min_delta = self .bit_reader .get_zigzag_vlq_int() .ok_or_else(|| eof_err!("Not enough data to decode 'min_delta'"))?; - self.delta_bit_widths.clear(); - for _ in 0..self.num_mini_blocks { - let w = self - .bit_reader - .get_aligned::(1) - .ok_or_else(|| eof_err!("Not enough data to decode 'width'"))?; - self.delta_bit_widths.push(w); + self.min_delta = T::T::from_i64(min_delta) + .ok_or_else(|| general_err!("'min_delta' too large"))?; + + self.mini_block_bit_widths.clear(); + self.bit_reader.get_aligned_bytes( + &mut self.mini_block_bit_widths, + self.mini_blocks_per_block as usize, + ); + + let mut offset = self.bit_reader.get_byte_offset(); + let mut remaining = self.values_left; + + // Compute the end offset of the current block + for b in &mut self.mini_block_bit_widths { + if remaining == 0 { + // Specification requires handling arbitrary bit widths + // for trailing mini blocks + *b = 0; + } + remaining = remaining.saturating_sub(self.values_per_mini_block); + offset += *b as usize * self.values_per_mini_block / 8; } + self.block_end_offset = offset; + if self.mini_block_bit_widths.len() != self.mini_blocks_per_block { + return Err(eof_err!("insufficient mini block bit widths")); + } + + self.mini_block_remaining = self.values_per_mini_block; self.mini_block_idx = 0; - self.delta_bit_width = self.delta_bit_widths.data()[0]; - self.values_current_mini_block = self.values_per_mini_block; + Ok(()) } - /// Loads delta into mini block. + /// Initializes the next mini block #[inline] - fn load_deltas_in_mini_block(&mut self) -> Result<()> - where - T::T: FromBytes, - { - if self.use_batch { - self.deltas_in_mini_block - .resize(self.values_current_mini_block, T::T::default()); - let loaded = self.bit_reader.get_batch::( - &mut self.deltas_in_mini_block[..], - self.delta_bit_width as usize, - ); - assert!(loaded == self.values_current_mini_block); + fn next_mini_block(&mut self) -> Result<()> { + if self.mini_block_idx + 1 < self.mini_block_bit_widths.len() { + self.mini_block_idx += 1; + self.mini_block_remaining = self.values_per_mini_block; + Ok(()) } else { - self.deltas_in_mini_block.clear(); - for _ in 0..self.values_current_mini_block { - // TODO: load one batch at a time similar to int32 - let delta = self - .bit_reader - .get_value::(self.delta_bit_width as usize) - .ok_or_else(|| eof_err!("Not enough data to decode 'delta'"))?; - self.deltas_in_mini_block.push(delta); - } + self.next_block() } - - Ok(()) } } -impl Decoder for DeltaBitPackDecoder { +impl Decoder for DeltaBitPackDecoder +where + T::T: Default + FromPrimitive + WrappingAdd + Copy, +{ // # of total values is derived from encoding #[inline] fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> { self.bit_reader = BitReader::new(data); self.initialized = true; - let block_size = self + // Read header information + self.block_size = self .bit_reader .get_vlq_int() - .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))?; - self.num_mini_blocks = self + .ok_or_else(|| eof_err!("Not enough data to decode 'block_size'"))? + .try_into() + .map_err(|_| general_err!("invalid 'block_size'"))?; + + self.mini_blocks_per_block = self .bit_reader .get_vlq_int() - .ok_or_else(|| eof_err!("Not enough data to decode 'num_mini_blocks'"))?; - self.num_values = self + .ok_or_else(|| eof_err!("Not enough data to decode 'mini_blocks_per_block'"))? + .try_into() + .map_err(|_| general_err!("invalid 'mini_blocks_per_block'"))?; + + self.values_left = self .bit_reader .get_vlq_int() - .ok_or_else(|| eof_err!("Not enough data to decode 'num_values'"))? - as usize; - self.first_value = self + .ok_or_else(|| eof_err!("Not enough data to decode 'values_left'"))? + .try_into() + .map_err(|_| general_err!("invalid 'values_left'"))?; + + let first_value = self .bit_reader .get_zigzag_vlq_int() .ok_or_else(|| eof_err!("Not enough data to decode 'first_value'"))?; + self.first_value = Some( + T::T::from_i64(first_value) + .ok_or_else(|| general_err!("first value too large"))?, + ); + + if self.block_size % 128 != 0 { + return Err(general_err!( + "'block_size' must be a multiple of 128, got {}", + self.block_size + )); + } + + if self.block_size % self.mini_blocks_per_block != 0 { + return Err(general_err!( + "'block_size' must be a multiple of 'mini_blocks_per_block' got {} and {}", + self.block_size, self.mini_blocks_per_block + )); + } + // Reset decoding state - self.first_value_read = false; self.mini_block_idx = 0; - self.delta_bit_widths.clear(); - self.values_current_mini_block = 0; + self.values_per_mini_block = self.block_size / self.mini_blocks_per_block; + self.mini_block_remaining = 0; + self.mini_block_bit_widths.clear(); - self.values_per_mini_block = (block_size / self.num_mini_blocks) as usize; - assert!(self.values_per_mini_block % 8 == 0); + if self.values_per_mini_block % 32 != 0 { + return Err(general_err!( + "'values_per_mini_block' must be a multiple of 32 got {}", + self.values_per_mini_block + )); + } Ok(()) } fn get(&mut self, buffer: &mut [T::T]) -> Result { assert!(self.initialized, "Bit reader is not initialized"); + if buffer.is_empty() { + return Ok(0); + } - let num_values = cmp::min(buffer.len(), self.num_values); - for i in 0..num_values { - if !self.first_value_read { - self.set_decoded_value(buffer, i, self.first_value); - self.current_value = self.first_value; - self.first_value_read = true; - continue; + let mut read = 0; + let to_read = buffer.len().min(self.values_left); + + if let Some(value) = self.first_value.take() { + self.last_value = value; + buffer[0] = value; + read += 1; + } + + while read != to_read { + if self.mini_block_remaining == 0 { + self.next_mini_block()?; } - if self.values_current_mini_block == 0 { - self.mini_block_idx += 1; - if self.mini_block_idx < self.delta_bit_widths.size() { - self.delta_bit_width = - self.delta_bit_widths.data()[self.mini_block_idx]; - self.values_current_mini_block = self.values_per_mini_block; - } else { - self.init_block()?; - } - self.load_deltas_in_mini_block()?; + let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize; + let batch_to_read = self.mini_block_remaining.min(to_read - read); + + let batch_read = self + .bit_reader + .get_batch(&mut buffer[read..read + batch_to_read], bit_width); + + // At this point we have read the deltas to `buffer` we now need to offset + // these to get back to the original values that were encoded + for v in &mut buffer[read..read + batch_read] { + // It is OK for deltas to contain "overflowed" values after encoding, + // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and + // restore original value. + *v = v + .wrapping_add(&self.min_delta) + .wrapping_add(&self.last_value); + + self.last_value = *v; } - // we decrement values in current mini block, so we need to invert index for - // delta - let delta = self.get_delta( - self.deltas_in_mini_block.len() - self.values_current_mini_block, - ); - // It is OK for deltas to contain "overflowed" values after encoding, - // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and - // restore original value. - self.current_value = self.current_value.wrapping_add(self.min_delta); - self.current_value = self.current_value.wrapping_add(delta as i64); - self.set_decoded_value(buffer, i, self.current_value); - self.values_current_mini_block -= 1; + read += batch_read; + self.mini_block_remaining -= batch_read; } - self.num_values -= num_values; - Ok(num_values) + self.values_left -= to_read; + Ok(to_read) } fn values_left(&self) -> usize { - self.num_values + self.values_left } fn encoding(&self) -> Encoding { @@ -621,42 +683,6 @@ impl Decoder for DeltaBitPackDecoder { } } -/// Helper trait to define specific conversions when decoding values -trait DeltaBitPackDecoderConversion { - /// Sets decoded value based on type `T`. - fn get_delta(&self, index: usize) -> i64; - - fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64); -} - -impl DeltaBitPackDecoderConversion for DeltaBitPackDecoder { - #[inline] - fn get_delta(&self, index: usize) -> i64 { - ensure_phys_ty!( - Type::INT32 | Type::INT64, - "DeltaBitPackDecoder only supports Int32Type and Int64Type" - ); - self.deltas_in_mini_block[index].as_i64().unwrap() - } - - #[inline] - fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64) { - match T::get_physical_type() { - Type::INT32 => { - let val = buffer[index].as_mut_any().downcast_mut::().unwrap(); - - *val = value as i32; - } - Type::INT64 => { - let val = buffer[index].as_mut_any().downcast_mut::().unwrap(); - - *val = value; - } - _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), - }; - } -} - // ---------------------------------------------------------------------- // DELTA_LENGTH_BYTE_ARRAY Decoding diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 162cfd8b5813..67eafd33bdac 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -528,8 +528,14 @@ impl BitReader { Some(from_ne_slice(v.as_bytes())) } + /// Read multiple values from their packed representation + /// + /// # Panics + /// + /// This function panics if + /// - `bit_width` is larger than the bit-capacity of `T` + /// pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { - assert!(num_bits <= 32); assert!(num_bits <= size_of::() * 8); let mut values_to_read = batch.len(); @@ -541,6 +547,17 @@ impl BitReader { let mut i = 0; + if num_bits > 32 { + // No fast path - read values individually + while i < values_to_read { + batch[i] = self + .get_value(num_bits) + .expect("expected to have more data"); + i += 1; + } + return values_to_read + } + // First align bit offset to byte offset if self.bit_offset != 0 { while i < values_to_read && self.bit_offset != 0 { @@ -602,6 +619,26 @@ impl BitReader { values_to_read } + /// Reads up to `num_bytes` to `buf` returning the number of bytes read + pub(crate) fn get_aligned_bytes( + &mut self, + buf: &mut Vec, + num_bytes: usize, + ) -> usize { + // Align to byte offset + self.byte_offset += ceil(self.bit_offset as i64, 8) as usize; + self.bit_offset = 0; + + let src = &self.buffer.data()[self.byte_offset..]; + let to_read = num_bytes.min(src.len()); + buf.extend_from_slice(&src[..to_read]); + + self.byte_offset += to_read; + self.reload_buffer_values(); + + to_read + } + /// Reads a `num_bytes`-sized value from this buffer and return it. /// `T` needs to be a little-endian native type. The value is assumed to be byte /// aligned so the bit reader will be advanced to the start of the next byte before