Skip to content

Commit

Permalink
reuse buffer in view array
Browse files Browse the repository at this point in the history
  • Loading branch information
XiangpengHao committed Jan 2, 2025
1 parent 4a0bdde commit 88e1a6a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
15 changes: 15 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,21 @@ impl Buffer {
}
}

/// Auxiliary method to create a new Buffer
///
/// This is convenient for converting a [`bytes::Bytes`] to a [`Buffer`] without copying.
///
/// ```
/// # use arrow_buffer::Buffer;
/// let bytes = bytes::Bytes::from_static(b"foo");
/// let buffer = Buffer::from_external_bytes(bytes);
/// ```
#[inline]
pub fn from_external_bytes(bytes: bytes::Bytes) -> Self {
let inner_bytes = Bytes::from(bytes);
Self::from_bytes(inner_bytes)
}

/// Returns the offset, in bytes, of `Self::ptr` to `Self::data`
///
/// self.ptr and self.data can be different after slicing or advancing the buffer.
Expand Down
45 changes: 38 additions & 7 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl ByteViewArrayDecoder {

/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
pub struct ByteViewArrayDecoderPlain {
buf: Bytes,
buf: Buffer,
offset: usize,

validate_utf8: bool,
Expand All @@ -308,18 +308,29 @@ impl ByteViewArrayDecoderPlain {
validate_utf8: bool,
) -> Self {
Self {
buf,
buf: Buffer::from_external_bytes(buf),
offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
validate_utf8,
}
}

pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
let block_id = output.append_block(buf);
// avoid creating a new buffer if the last buffer is the same as the current buffer
// This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer.
let need_to_create_new_buffer = {
if let Some(last_buffer) = output.buffers.last() {
!last_buffer.ptr_eq(&self.buf)
} else {
true
}
};

let block_id = if need_to_create_new_buffer {
output.append_block(self.buf.clone())
} else {
output.buffers.len() as u32 - 1
};

let to_read = len.min(self.max_remaining_values);

Expand Down Expand Up @@ -692,12 +703,13 @@ mod tests {

use crate::{
arrow::{
array_reader::test_util::{byte_array_all_encodings, utf8_column},
array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
buffer::view_buffer::ViewBuffer,
record_reader::buffer::ValuesBuffer,
},
basic::Encoding,
column::reader::decoder::ColumnValueDecoder,
data_type::ByteArray,
};

use super::*;
Expand Down Expand Up @@ -748,4 +760,23 @@ mod tests {
);
}
}

#[test]
fn test_byte_view_array_plain_decoder_reuse_buffer() {
let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
let pages = encode_byte_array(Encoding::PLAIN, &byte_array);

let column_desc = utf8_column();
let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);

let mut view_buffer = ViewBuffer::default();
decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
decoder.read(&mut view_buffer, 1).unwrap();
assert_eq!(view_buffer.buffers.len(), 1);

decoder.read(&mut view_buffer, 1).unwrap();
assert_eq!(view_buffer.buffers.len(), 1);
}
}

0 comments on commit 88e1a6a

Please sign in to comment.