Skip to content

Commit

Permalink
Vectorized DeltaBitPackDecoder (#1281)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Feb 8, 2022
1 parent 936ed5e commit fa6233a
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 160 deletions.
1 change: 1 addition & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ flate2 = { version = "1.0", optional = true }
lz4 = { version = "1.23", optional = true }
zstd = { version = "0.10", optional = true }
chrono = { version = "0.4", default-features = false }
num = "0.4"
num-bigint = "0.4"
arrow = { path = "../arrow", version = "9.0.0", optional = true, default-features = false, features = ["ipc"] }
base64 = { version = "0.13", optional = true }
Expand Down
78 changes: 67 additions & 11 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}

fn build_plain_encoded_int32_page_iterator(
fn build_encoded_int32_page_iterator(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
) -> impl PageIterator + Clone {
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
let mut int32_value = 0;
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
Expand All @@ -78,16 +78,15 @@ fn build_plain_encoded_int32_page_iterator(
max_def_level
};
if def_level == max_def_level {
int32_value += 1;
values.push(int32_value);
values.push(rng.gen_range(0..1000));
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
page_builder.add_values::<Int32Type>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
Expand Down Expand Up @@ -332,9 +331,7 @@ fn create_complex_object_byte_array_dictionary_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::{
make_byte_array_dictionary_reader, ComplexObjectArrayReader,
};
use parquet::arrow::array_reader::ComplexObjectArrayReader;
use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
let arrow_type =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
Expand Down Expand Up @@ -367,10 +364,11 @@ fn add_benches(c: &mut Criterion) {
// =============================

// int32, plain encoded, no NULLs
let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
let plain_int32_no_null_data = build_encoded_int32_page_iterator(
schema.clone(),
mandatory_int32_column_desc.clone(),
0.0,
Encoding::PLAIN,
);
group.bench_function("read Int32Array, plain encoded, mandatory, no NULLs", |b| {
b.iter(|| {
Expand All @@ -383,10 +381,11 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
let plain_int32_no_null_data = build_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.0,
Encoding::PLAIN,
);
group.bench_function("read Int32Array, plain encoded, optional, no NULLs", |b| {
b.iter(|| {
Expand All @@ -400,10 +399,11 @@ fn add_benches(c: &mut Criterion) {
});

// int32, plain encoded, half NULLs
let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
let plain_int32_half_null_data = build_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.5,
Encoding::PLAIN,
);
group.bench_function(
"read Int32Array, plain encoded, optional, half NULLs",
Expand All @@ -419,6 +419,62 @@ fn add_benches(c: &mut Criterion) {
},
);

// int32, binary packed, no NULLs
let plain_int32_no_null_data = build_encoded_int32_page_iterator(
schema.clone(),
mandatory_int32_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
);
group.bench_function("read Int32Array, binary packed, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
plain_int32_no_null_data.clone(),
mandatory_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let plain_int32_no_null_data = build_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
);
group.bench_function("read Int32Array, binary packed, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
plain_int32_no_null_data.clone(),
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// int32, binary packed, half NULLs
let plain_int32_half_null_data = build_encoded_int32_page_iterator(
schema.clone(),
optional_int32_column_desc.clone(),
0.5,
Encoding::DELTA_BINARY_PACKED,
);
group.bench_function(
"read Int32Array, binary packed, optional, half NULLs",
|b| {
b.iter(|| {
let array_reader = create_int32_primitive_array_reader(
plain_int32_half_null_data.clone(),
optional_int32_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

// int32, dictionary encoded, no NULLs
let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator(
schema.clone(),
Expand Down
Loading

0 comments on commit fa6233a

Please sign in to comment.