Skip to content

Commit

Permalink
Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (
Browse files Browse the repository at this point in the history
#2111)

* Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder

* Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoderImpl

* fix ut

* fix and refine support skip definition_level

* fix clippy

* fix comment
  • Loading branch information
Ted-Jiang authored Jul 24, 2022
1 parent 7746e7d commit fce6626
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 6 deletions.
108 changes: 105 additions & 3 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::ops::Range;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
use arrow::util::bit_chunk_iterator::UnalignedBitChunk;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
Expand Down Expand Up @@ -216,10 +217,15 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
fn skip_def_levels(
&mut self,
_num_levels: usize,
_max_def_level: i16,
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
match &mut self.decoder {
MaybePacked::Fallback(decoder) => {
decoder.skip_def_levels(num_levels, max_def_level)
}
MaybePacked::Packed(decoder) => decoder.skip(num_levels),
}
}
}

Expand Down Expand Up @@ -346,6 +352,41 @@ impl PackedDecoder {
}
Ok(read)
}

/// Skips `level_num` definition levels
///
/// Returns the number of values skipped and the number of levels skipped
fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
let mut skipped_value = 0;
let mut skipped_level = 0;
while skipped_level != level_num {
if self.rle_left != 0 {
let to_skip = self.rle_left.min(level_num - skipped_level);
self.rle_left -= to_skip;
skipped_level += to_skip;
if self.rle_value {
skipped_value += to_skip;
}
} else if self.packed_count != self.packed_offset {
let to_skip = (self.packed_count - self.packed_offset)
.min(level_num - skipped_level);
let offset = self.data_offset * 8 + self.packed_offset;
let bit_chunk =
UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
skipped_value += bit_chunk.count_ones();
self.packed_offset += to_skip;
skipped_level += to_skip;
if self.packed_offset == self.packed_count {
self.data_offset += self.packed_count / 8;
}
} else if self.data_offset == self.data.len() {
break;
} else {
self.next_rle_block()?
}
}
Ok((skipped_value, skipped_level))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -392,6 +433,67 @@ mod tests {
assert_eq!(decoded.as_slice(), expected.as_slice());
}

#[test]
fn test_packed_decoder_skip() {
let mut rng = thread_rng();
let len: usize = rng.gen_range(512..1024);

let mut expected = BooleanBufferBuilder::new(len);
let mut encoder = RleEncoder::new(1, 1024);

let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
expected.append(bool);
if bool {
total_value += 1;
}
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

let mut skip_value = 0;
let mut read_value = 0;
let mut skip_level = 0;
let mut read_level = 0;

loop {
let offset = skip_level + read_level;
let remaining_levels = len - offset;
if remaining_levels == 0 {
break;
}
let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
if rng.gen_bool(0.5) {
let (skip_val_num, skip_level_num) =
decoder.skip(to_read_or_skip_level).unwrap();
skip_value += skip_val_num;
skip_level += skip_level_num
} else {
let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
let read_level_num =
decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
read_level += read_level_num;
for i in 0..read_level_num {
assert!(!decoded.is_empty());
//check each read bit
let read_bit = decoded.get_bit(i);
if read_bit {
read_value += 1;
}
let expect_bit = expected.get_bit(i + offset);
assert_eq!(read_bit, expect_bit);
}
}
}
assert_eq!(read_level + skip_level, len);
assert_eq!(read_value + skip_value, total_value);
}

#[test]
fn test_split_off() {
let t = Type::primitive_type_builder("col", PhysicalType::INT32)
Expand Down
37 changes: 34 additions & 3 deletions parquet/src/column/reader/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,41 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_def_levels(
&mut self,
_num_levels: usize,
_max_def_level: i16,
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
let mut level_skip = 0;
let mut value_skip = 0;
match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
for _ in 0..num_levels {
// Values are delimited by max_def_level
if max_def_level
== reader
.get_value::<i16>(*bit_width as usize)
.expect("Not enough values in Packed ColumnLevelDecoderImpl.")
{
value_skip += 1;
}
level_skip += 1;
}
}
LevelDecoderInner::Rle(reader) => {
for _ in 0..num_levels {
if let Some(level) = reader
.get::<i16>()
.expect("Not enough values in Rle ColumnLevelDecoderImpl.")
{
// Values are delimited by max_def_level
if level == max_def_level {
value_skip += 1;
}
}
level_skip += 1;
}
}
}
Ok((value_skip, level_skip))
}
}

Expand Down

0 comments on commit fce6626

Please sign in to comment.