Skip to content

Commit

Permalink
Move record delimiting into ColumnReader (apache#4365)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 6, 2023
1 parent e1badc0 commit d247891
Show file tree
Hide file tree
Showing 13 changed files with 522 additions and 633 deletions.
4 changes: 2 additions & 2 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ impl BufferQueue for FixedLenByteArrayBuffer {
type Output = Buffer;
type Slice = Self;

fn split_off(&mut self, len: usize) -> Self::Output {
self.buffer.split_off(len * self.byte_length)
fn consume(&mut self) -> Self::Output {
self.buffer.consume()
}

fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
Expand Down
59 changes: 53 additions & 6 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2131,15 +2131,15 @@ mod tests {

#[test]
fn test_row_group_exact_multiple() {
use crate::arrow::record_reader::MIN_BATCH_SIZE;
const BATCH_SIZE: usize = 1024;
test_row_group_batch(8, 8);
test_row_group_batch(10, 8);
test_row_group_batch(8, 10);
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE);
test_row_group_batch(MIN_BATCH_SIZE + 1, MIN_BATCH_SIZE);
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE + 1);
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
}

/// Given a RecordBatch containing all the column data, return the expected batches given
Expand Down Expand Up @@ -2610,4 +2610,51 @@ mod tests {
test_decimal_roundtrip::<Decimal128Type>();
test_decimal_roundtrip::<Decimal256Type>();
}

#[test]
fn test_list_selection() {
let schema = Arc::new(Schema::new(vec![Field::new_list(
"list",
Field::new("item", ArrowDataType::Utf8, true),
false,
)]));
let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(
&mut buf,
schema.clone(),
Some(WriterProperties::builder().build()),
)
.unwrap();

for _ in 0..2 {
let mut list_a_builder = ListBuilder::new(StringBuilder::new());
for i in 0..1024 {
list_a_builder.values().append_value(format!("{i}"));
list_a_builder.append(true);
}
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(list_a_builder.finish())],
)
.unwrap();
writer.write(&batch).unwrap();
}
let _metadata = writer.close().unwrap();

let buf = Bytes::from(buf);
let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(100),
RowSelector::select(924),
RowSelector::skip(100),
RowSelector::select(924),
]))
.build()
.unwrap();

let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
assert_eq!(total_rows, 924 * 2);
}
}
30 changes: 10 additions & 20 deletions parquet/src/arrow/buffer/dictionary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
type Output = Self;
type Slice = Self;

fn split_off(&mut self, len: usize) -> Self::Output {
fn consume(&mut self) -> Self::Output {
match self {
Self::Dict { keys, values } => Self::Dict {
keys: keys.take(len),
keys: std::mem::take(keys).into(),
values: values.clone(),
},
Self::Values { values } => Self::Values {
values: values.split_off(len),
values: values.consume(),
},
}
}
Expand Down Expand Up @@ -275,20 +275,6 @@ mod tests {
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());

// Split off some data

let split = buffer.split_off(4);
let null_buffer = Buffer::from_iter(valid.drain(0..4));
let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);

let strings = cast(&array, &ArrowType::Utf8).unwrap();
let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![None, None, Some("world"), Some("hello")]
);

// Read some data not preserving the dictionary

let values = buffer.spill_values().unwrap();
Expand All @@ -300,8 +286,8 @@ mod tests {
let null_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());

assert_eq!(buffer.len(), 9);
let split = buffer.split_off(9);
assert_eq!(buffer.len(), 13);
let split = buffer.consume();

let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);
Expand All @@ -311,6 +297,10 @@ mod tests {
assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("world"),
Some("hello"),
None,
Some("a"),
Some(""),
Expand All @@ -332,7 +322,7 @@ mod tests {
.unwrap()
.extend_from_slice(&[0, 1, 0, 1]);

let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
let array = buffer.consume().into_array(None, &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);

let strings = cast(&array, &ArrowType::Utf8).unwrap();
Expand Down
29 changes: 6 additions & 23 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,8 @@ impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
type Output = Self;
type Slice = Self;

fn split_off(&mut self, len: usize) -> Self::Output {
assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len);
let remaining_offsets = self.offsets.len() - len - 1;
let offsets = self.offsets.as_slice();

let end_offset = offsets[len];

let mut new_offsets = ScalarBuffer::new();
new_offsets.reserve(remaining_offsets + 1);
for v in &offsets[len..] {
new_offsets.push(*v - end_offset)
}

self.offsets.resize(len + 1);

Self {
offsets: std::mem::replace(&mut self.offsets, new_offsets),
values: self.values.take(end_offset.as_usize()),
}
fn consume(&mut self) -> Self::Output {
std::mem::take(self)
}

fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
Expand Down Expand Up @@ -267,26 +250,26 @@ mod tests {
}

#[test]
fn test_offset_buffer_split() {
fn test_offset_buffer() {
let mut buffer = OffsetBuffer::<i32>::default();
for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
buffer.try_push(v.as_bytes(), false).unwrap()
}
let split = buffer.split_off(3);
let split = buffer.consume();

let array = split.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
vec!["hello", "world", "cupcakes"]
vec!["hello", "world", "cupcakes", "a", "b", "c"]
);

buffer.try_push("test".as_bytes(), false).unwrap();
let array = buffer.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
vec!["a", "b", "c", "test"]
vec!["test"]
);
}

Expand Down
38 changes: 4 additions & 34 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ pub trait BufferQueue: Sized {

type Slice: ?Sized;

/// Split out the first `len` items
///
/// # Panics
///
/// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
///
fn split_off(&mut self, len: usize) -> Self::Output;
/// Consumes the contents of this [`BufferQueue`]
fn consume(&mut self) -> Self::Output;

/// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
/// to append data to the end of this [`BufferQueue`]
Expand Down Expand Up @@ -146,31 +141,6 @@ impl<T: ScalarValue> ScalarBuffer<T> {
assert!(prefix.is_empty() && suffix.is_empty());
buf
}

pub fn take(&mut self, len: usize) -> Self {
assert!(len <= self.len);

let num_bytes = len * std::mem::size_of::<T>();
let remaining_bytes = self.buffer.len() - num_bytes;
// TODO: Optimize to reduce the copy
// create an empty buffer, as it will be resized below
let mut remaining = MutableBuffer::new(0);
remaining.resize(remaining_bytes, 0);

let new_records = remaining.as_slice_mut();

new_records[0..remaining_bytes]
.copy_from_slice(&self.buffer.as_slice()[num_bytes..]);

self.buffer.resize(num_bytes, 0);
self.len -= len;

Self {
buffer: std::mem::replace(&mut self.buffer, remaining),
len,
_phantom: Default::default(),
}
}
}

impl<T: ScalarValue + ArrowNativeType> ScalarBuffer<T> {
Expand All @@ -196,8 +166,8 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {

type Slice = [T];

fn split_off(&mut self, len: usize) -> Self::Output {
self.take(len).into()
fn consume(&mut self) -> Self::Output {
std::mem::take(self).into()
}

fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
Expand Down
Loading

0 comments on commit d247891

Please sign in to comment.