From e7e219ac228c9db65f3f6758bd491f8f1840b4bf Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 15 Apr 2024 01:26:45 +0800 Subject: [PATCH] Revert "use different buffer for different encoding" This reverts commit 71f51aa10079cdadfe7a0f5026563eedd7f76bd9. --- parquet-testing | 2 +- .../src/arrow/array_reader/byte_view_array.rs | 11 +-- parquet/src/arrow/array_reader/mod.rs | 2 - parquet/src/arrow/arrow_reader/mod.rs | 2 +- parquet/src/arrow/buffer/view_buffer.rs | 75 +++---------------- testing | 2 +- 6 files changed, 20 insertions(+), 74 deletions(-) diff --git a/parquet-testing b/parquet-testing index 4cb3cff24c96..1ba34478f535 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 4cb3cff24c965fb329cdae763eabce47395a68a0 +Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 156cb0c5e47e..1c27a3b58b26 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -327,7 +327,6 @@ impl ByteViewArrayDecoderPlain { let mut read = 0; let buf = self.buf.as_ref(); - output.add_buffer(self.buf.clone()); while self.offset < self.buf.len() && read != to_read { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte view array".into())); @@ -341,7 +340,7 @@ impl ByteViewArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte view array".into())); } - output.try_push_with_offset(start_offset, end_offset)?; + output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; self.offset = end_offset; read += 1; @@ -417,10 +416,12 @@ impl ByteViewArrayDecoderDeltaLength { } let mut start_offset = self.data_offset; - output.add_buffer(self.data.clone()); for length in src_lengths { let end_offset = start_offset + *length as usize; - output.try_push_with_offset(start_offset, end_offset)?; + output.try_push( + &self.data.as_ref()[start_offset..end_offset], + self.validate_utf8, + )?; start_offset = end_offset; } @@ -498,7 +499,7 @@ impl ByteViewArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap()) + output.extend_from_dictionary(keys, &dict.views, &dict.buffer) }) } diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index d0510c790263..fef1e8722f86 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -47,8 +47,6 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; -#[allow(unused_imports)] // Only used for benchmarks -pub use byte_array::make_byte_view_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; #[allow(unused_imports)] // Only used for benchmarks pub use byte_view_array::make_byte_view_array_reader; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 6b95324bee39..e5b0e7d05739 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -2308,7 +2308,7 @@ mod tests { ), ( invalid_utf8_later_char::(), - "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 6", + "Parquet argument error: Parquet error: encountered non UTF-8 data: invalid utf-8 sequence of 1 bytes from index 3", ), ]; for (array, expected_error) in cases { diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 784a67c2e1cc..743ee6cf288a 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -22,22 +22,13 @@ use arrow_array::{make_array, ArrayRef}; use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::{ArrayDataBuilder, ByteView}; use arrow_schema::DataType as ArrowType; -use bytes::Bytes; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, - /// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer` - /// to hold the page data without copy. - pub plain_buffer: Option, - /// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer - /// since this encoding's page data not hold full data. - /// - /// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers - /// cause these encoding first build dict then use dict to read data. - pub delta_buffer: Vec, + pub buffer: Vec, } impl ViewBuffer { @@ -50,36 +41,6 @@ impl ViewBuffer { self.len() == 0 } - /// add entire page buf to [`ViewBuffer`], avoid copy data. - pub fn add_buffer(&mut self, buf: Bytes) { - if self.plain_buffer.is_none() { - self.plain_buffer = Some(buf); - } - } - - /// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`], - /// we only need to slice the data to build the view. - pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> { - let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset]; - let length: u32 = (end_offset - start_offset) as u32; - if length <= 12 { - let mut view_buffer = [0; 16]; - view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); - view_buffer[4..4 + length as usize].copy_from_slice(data); - self.views.push(u128::from_le_bytes(view_buffer)); - return Ok(()); - } - - let view = ByteView { - length, - prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), - buffer_index: 0, - offset: start_offset as u32, - }; - self.views.push(view.into()); - Ok(()) - } - /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -107,8 +68,8 @@ impl ViewBuffer { return Ok(()); } - let offset = self.delta_buffer.len() as u32; - self.delta_buffer.extend_from_slice(data); + let offset = self.buffer.len() as u32; + self.buffer.extend_from_slice(data); let view = ByteView { length, @@ -168,34 +129,20 @@ impl ViewBuffer { return Ok(()); } let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - if self.plain_buffer.is_none() { - match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } - } else { - match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), - } + match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), } } /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let len = self.len(); - let array_data_builder = { - let builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .null_bit_buffer(null_buffer); - - if self.plain_buffer.is_none() { - builder.add_buffer(Buffer::from_vec(self.delta_buffer)) - } else { - builder.add_buffer(self.plain_buffer.unwrap().into()) - } - }; + let array_data_builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .add_buffer(Buffer::from_vec(self.buffer)) + .null_bit_buffer(null_buffer); let data = match cfg!(debug_assertions) { true => array_data_builder.build().unwrap(), diff --git a/testing b/testing index e270341fb5f3..735ae7128d57 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883