diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 265f951970ac..94ed8bbc3680 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -31,6 +31,7 @@ rust-version = "1.57" [dependencies] parquet-format = "4.0.0" +bytes = "1.1" byteorder = "1" thrift = "0.13" snap = { version = "1.0", optional = true } diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index 1d8441cbd7f3..d2250f8efbfc 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -1381,7 +1381,6 @@ mod tests { #[test] fn test_complex_array_reader_dict_enc_string() { use crate::encodings::encoding::{DictEncoder, Encoder}; - use crate::util::memory::MemTracker; // Construct column schema let message_type = " message test_schema { @@ -1412,9 +1411,8 @@ mod tests { let mut all_values = Vec::with_capacity(num_pages * values_per_page); for i in 0..num_pages { - let mem_tracker = Arc::new(MemTracker::new()); let mut dict_encoder = - DictEncoder::::new(column_desc.clone(), mem_tracker); + DictEncoder::::new(column_desc.clone()); // add data page let mut values = Vec::with_capacity(values_per_page); diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index afee4659c3dd..0c044eb2df63 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -29,7 +29,7 @@ use crate::errors::Result; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, }; -use crate::util::memory::{ByteBufferPtr, MemTracker}; +use crate::util::memory::ByteBufferPtr; /// Returns a descriptor for a UTF-8 column pub fn utf8_column() -> ColumnDescPtr { @@ -49,9 +49,7 @@ pub fn utf8_column() -> ColumnDescPtr { /// Encode `data` with the provided `encoding` pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPtr { let descriptor = utf8_column(); - let mem_tracker = Arc::new(MemTracker::new()); - let mut encoder = - get_encoder::(descriptor, encoding, mem_tracker).unwrap(); + let mut encoder = get_encoder::(descriptor, encoding).unwrap(); encoder.put(data).unwrap(); encoder.flush_buffer().unwrap() @@ -59,8 +57,7 @@ pub fn encode_byte_array(encoding: Encoding, data: &[ByteArray]) -> ByteBufferPt /// Returns the encoded dictionary and value data pub fn encode_dictionary(data: &[ByteArray]) -> (ByteBufferPtr, ByteBufferPtr) { - let mut dict_encoder = - DictEncoder::::new(utf8_column(), Arc::new(MemTracker::new())); + let mut dict_encoder = DictEncoder::::new(utf8_column()); dict_encoder.put(data).unwrap(); let encoded_rle = dict_encoder.flush_buffer().unwrap(); diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 1fc722f2910f..3a45ecf3ff8f 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -338,13 +338,13 @@ where let mut offset = 0; if max_rep_level > 0 { - let level_data = parse_v1_level( + let (bytes_read, level_data) = parse_v1_level( max_rep_level, num_values, rep_level_encoding, buf.start_from(offset), )?; - offset = level_data.end(); + offset += bytes_read; let decoder = R::new(max_rep_level, rep_level_encoding, level_data); @@ -353,13 +353,13 @@ where } if max_def_level > 0 { - let level_data = parse_v1_level( + let (bytes_read, level_data) = parse_v1_level( max_def_level, num_values, def_level_encoding, buf.start_from(offset), )?; - offset = level_data.end(); + offset += bytes_read; let decoder = D::new(max_def_level, def_level_encoding, level_data); @@ -460,20 +460,20 @@ fn parse_v1_level( num_buffered_values: u32, encoding: Encoding, buf: ByteBufferPtr, -) -> Result { +) -> Result<(usize, ByteBufferPtr)> { match encoding { Encoding::RLE => { let i32_size = std::mem::size_of::(); let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as usize; - Ok(buf.range(i32_size, data_size)) + Ok((i32_size + data_size, buf.range(i32_size, data_size))) } Encoding::BIT_PACKED => { let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; let num_bytes = ceil( (num_buffered_values as usize * bit_width as usize) as i64, 8, - ); - Ok(buf.range(0, num_bytes as usize)) + ) as usize; + Ok((num_bytes, buf.range(0, num_bytes))) } _ => Err(general_err!("invalid level encoding: {}", encoding)), } diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 13ea851573c1..a7d0ba8fc810 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -16,7 +16,7 @@ // under the License. //! Contains column writer API. -use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc}; +use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData}; use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; @@ -36,7 +36,7 @@ use crate::file::{ }; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::FromBytes; -use crate::util::memory::{ByteBufferPtr, MemTracker}; +use crate::util::memory::ByteBufferPtr; /// Column writer for a Parquet type. pub enum ColumnWriter { @@ -213,7 +213,7 @@ impl ColumnWriterImpl { let dict_encoder = if props.dictionary_enabled(descr.path()) && has_dictionary_support(T::get_physical_type(), &props) { - Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new()))) + Some(DictEncoder::new(descr.clone())) } else { None }; @@ -227,7 +227,6 @@ impl ColumnWriterImpl { props .encoding(descr.path()) .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)), - Arc::new(MemTracker::new()), ) .unwrap(); @@ -1135,6 +1134,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool { #[cfg(test)] mod tests { use rand::distributions::uniform::SampleUniform; + use std::sync::Arc; use crate::column::{ page::PageReader, diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index ae1e01365376..28645a262546 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -31,7 +31,7 @@ use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::util::{ bit_util::{from_ne_slice, FromBytes}, - memory::{ByteBuffer, ByteBufferPtr}, + memory::ByteBufferPtr, }; /// Rust representation for logical type INT96, value is backed by an array of `u32`. @@ -217,14 +217,6 @@ impl From for ByteArray { } } -impl From for ByteArray { - fn from(mut buf: ByteBuffer) -> ByteArray { - Self { - data: Some(buf.consume()), - } - } -} - impl PartialEq for ByteArray { fn eq(&self, other: &ByteArray) -> bool { match (&self.data, &other.data) { @@ -1322,8 +1314,7 @@ mod tests { ByteArray::from(ByteBufferPtr::new(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(), &[1u8, 2u8, 3u8, 4u8, 5u8] ); - let mut buf = ByteBuffer::new(); - buf.set_data(vec![6u8, 7u8, 8u8, 9u8, 10u8]); + let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8]; assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]); } diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 24e0c962e7f2..7c95d553234b 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -936,11 +936,7 @@ mod tests { use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType, }; - use crate::util::{ - bit_util::set_array_bit, - memory::{BufferPtr, MemTracker}, - test_common::RandGen, - }; + use crate::util::{bit_util::set_array_bit, test_common::RandGen}; #[test] fn test_get_decoders() { @@ -1389,7 +1385,7 @@ mod tests { let length = data.len(); - let ptr = BufferPtr::new(data); + let ptr = ByteBufferPtr::new(data); let mut reader = BitReader::new(ptr.clone()); assert_eq!(reader.get_vlq_int().unwrap(), 256); assert_eq!(reader.get_vlq_int().unwrap(), 4); @@ -1472,8 +1468,7 @@ mod tests { // Encode data let mut encoder = - get_encoder::(col_descr.clone(), encoding, Arc::new(MemTracker::new())) - .expect("get encoder"); + get_encoder::(col_descr.clone(), encoding).expect("get encoder"); for v in &data[..] { encoder.put(&v[..]).expect("ok to encode"); diff --git a/parquet/src/encodings/encoding.rs b/parquet/src/encodings/encoding.rs index f4f304625643..4b9cf2e9bad5 100644 --- a/parquet/src/encodings/encoding.rs +++ b/parquet/src/encodings/encoding.rs @@ -28,7 +28,7 @@ use crate::schema::types::ColumnDescPtr; use crate::util::{ bit_util::{self, log2, num_required_bits, BitWriter}, hash_util, - memory::{Buffer, ByteBuffer, ByteBufferPtr, MemTrackerPtr}, + memory::ByteBufferPtr, }; // ---------------------------------------------------------------------- @@ -76,10 +76,9 @@ pub trait Encoder { pub fn get_encoder( desc: ColumnDescPtr, encoding: Encoding, - mem_tracker: MemTrackerPtr, ) -> Result>> { let encoder: Box> = match encoding { - Encoding::PLAIN => Box::new(PlainEncoder::new(desc, mem_tracker, vec![])), + Encoding::PLAIN => Box::new(PlainEncoder::new(desc, vec![])), Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { return Err(general_err!( "Cannot initialize this encoding through this function" @@ -109,7 +108,7 @@ pub fn get_encoder( /// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes. /// - FIXED_LEN_BYTE_ARRAY - just the bytes are stored. pub struct PlainEncoder { - buffer: ByteBuffer, + buffer: Vec, bit_writer: BitWriter, desc: ColumnDescPtr, _phantom: PhantomData, @@ -117,11 +116,9 @@ pub struct PlainEncoder { impl PlainEncoder { /// Creates new plain encoder. - pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr, vec: Vec) -> Self { - let mut byte_buffer = ByteBuffer::new().with_mem_tracker(mem_tracker); - byte_buffer.set_data(vec); + pub fn new(desc: ColumnDescPtr, buffer: Vec) -> Self { Self { - buffer: byte_buffer, + buffer, bit_writer: BitWriter::new(256), desc, _phantom: PhantomData, @@ -139,16 +136,15 @@ impl Encoder for PlainEncoder { } fn estimated_data_encoded_size(&self) -> usize { - self.buffer.size() + self.bit_writer.bytes_written() + self.buffer.len() + self.bit_writer.bytes_written() } #[inline] fn flush_buffer(&mut self) -> Result { - self.buffer.write_all(self.bit_writer.flush_buffer())?; - self.buffer.flush()?; + self.buffer + .extend_from_slice(self.bit_writer.flush_buffer()); self.bit_writer.clear(); - - Ok(self.buffer.consume()) + Ok(std::mem::take(&mut self.buffer).into()) } #[inline] @@ -189,35 +185,31 @@ pub struct DictEncoder { // Stores indices which map (many-to-one) to the values in the `uniques` array. // Here we are using fix-sized array with linear probing. // A slot with `HASH_SLOT_EMPTY` indicates the slot is not currently occupied. - hash_slots: Buffer, + hash_slots: Vec, // Indices that have not yet be written out by `write_indices()`. - buffered_indices: Buffer, + buffered_indices: Vec, // The unique observed values. - uniques: Buffer, + uniques: Vec, // Size in bytes needed to encode this dictionary. uniques_size_in_bytes: usize, - - // Tracking memory usage for the various data structures in this struct. - mem_tracker: MemTrackerPtr, } impl DictEncoder { /// Creates new dictionary encoder. - pub fn new(desc: ColumnDescPtr, mem_tracker: MemTrackerPtr) -> Self { - let mut slots = Buffer::new().with_mem_tracker(mem_tracker.clone()); + pub fn new(desc: ColumnDescPtr) -> Self { + let mut slots = vec![]; slots.resize(INITIAL_HASH_TABLE_SIZE, -1); Self { desc, hash_table_size: INITIAL_HASH_TABLE_SIZE, mod_bitmask: (INITIAL_HASH_TABLE_SIZE - 1) as u32, hash_slots: slots, - buffered_indices: Buffer::new().with_mem_tracker(mem_tracker.clone()), - uniques: Buffer::new().with_mem_tracker(mem_tracker.clone()), + buffered_indices: vec![], + uniques: vec![], uniques_size_in_bytes: 0, - mem_tracker, } } @@ -230,7 +222,7 @@ impl DictEncoder { /// Returns number of unique values (keys) in the dictionary. pub fn num_entries(&self) -> usize { - self.uniques.size() + self.uniques.len() } /// Returns size of unique values (keys) in the dictionary, in bytes. @@ -242,9 +234,8 @@ impl DictEncoder { /// the result. #[inline] pub fn write_dict(&self) -> Result { - let mut plain_encoder = - PlainEncoder::::new(self.desc.clone(), self.mem_tracker.clone(), vec![]); - plain_encoder.put(self.uniques.data())?; + let mut plain_encoder = PlainEncoder::::new(self.desc.clone(), vec![]); + plain_encoder.put(&self.uniques)?; plain_encoder.flush_buffer() } @@ -255,12 +246,11 @@ impl DictEncoder { let buffer_len = self.estimated_data_encoded_size(); let mut buffer: Vec = vec![0; buffer_len as usize]; buffer[0] = self.bit_width() as u8; - self.mem_tracker.alloc(buffer.capacity() as i64); // Write bit width in the first byte buffer.write_all((self.bit_width() as u8).as_bytes())?; let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1); - for index in self.buffered_indices.data() { + for index in &self.buffered_indices { if !encoder.put(*index as u64)? { return Err(general_err!("Encoder doesn't have enough space")); } @@ -293,7 +283,7 @@ impl DictEncoder { #[inline(never)] fn insert_fresh_slot(&mut self, slot: usize, value: T::T) -> i32 { - let index = self.uniques.size() as i32; + let index = self.uniques.len() as i32; self.hash_slots[slot] = index; let (base_size, num_elements) = value.dict_encoding_size(); @@ -307,7 +297,7 @@ impl DictEncoder { self.uniques_size_in_bytes += unique_size; self.uniques.push(value); - if self.uniques.size() > (self.hash_table_size as f32 * MAX_HASH_LOAD) as usize { + if self.uniques.len() > (self.hash_table_size as f32 * MAX_HASH_LOAD) as usize { self.double_table_size(); } @@ -316,7 +306,7 @@ impl DictEncoder { #[inline] fn bit_width(&self) -> u8 { - let num_entries = self.uniques.size(); + let num_entries = self.uniques.len(); if num_entries == 0 { 0 } else if num_entries == 1 { @@ -328,7 +318,7 @@ impl DictEncoder { fn double_table_size(&mut self) { let new_size = self.hash_table_size * 2; - let mut new_hash_slots = Buffer::new().with_mem_tracker(self.mem_tracker.clone()); + let mut new_hash_slots = vec![]; new_hash_slots.resize(new_size, HASH_SLOT_EMPTY); for i in 0..self.hash_table_size { let index = self.hash_slots[i]; @@ -376,7 +366,7 @@ impl Encoder for DictEncoder { fn estimated_data_encoded_size(&self) -> usize { let bit_width = self.bit_width(); 1 + RleEncoder::min_buffer_size(bit_width) - + RleEncoder::max_buffer_size(bit_width, self.buffered_indices.size()) + + RleEncoder::max_buffer_size(bit_width, self.buffered_indices.len()) } #[inline] @@ -677,10 +667,9 @@ impl Encoder for DeltaBitPackEncoder { // Write page header with total values self.write_page_header(); - let mut buffer = ByteBuffer::new(); - buffer.write_all(self.page_header_writer.flush_buffer())?; - buffer.write_all(self.bit_writer.flush_buffer())?; - buffer.flush()?; + let mut buffer = Vec::new(); + buffer.extend_from_slice(self.page_header_writer.flush_buffer()); + buffer.extend_from_slice(self.bit_writer.flush_buffer()); // Reset state self.page_header_writer.clear(); @@ -690,7 +679,7 @@ impl Encoder for DeltaBitPackEncoder { self.current_value = 0; self.values_in_block = 0; - Ok(buffer.consume()) + Ok(buffer.into()) } } @@ -933,10 +922,7 @@ mod tests { use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType, }; - use crate::util::{ - memory::MemTracker, - test_common::{random_bytes, RandGen}, - }; + use crate::util::test_common::{random_bytes, RandGen}; const TEST_SET_SIZE: usize = 1024; @@ -1286,8 +1272,7 @@ mod tests { err: Option, ) { let descr = create_test_col_desc_ptr(-1, T::get_physical_type()); - let mem_tracker = Arc::new(MemTracker::new()); - let encoder = get_encoder::(descr, encoding, mem_tracker); + let encoder = get_encoder::(descr, encoding); match err { Some(parquet_error) => { assert!(encoder.is_err()); @@ -1319,8 +1304,7 @@ mod tests { enc: Encoding, ) -> Box> { let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); - let mem_tracker = Arc::new(MemTracker::new()); - get_encoder(desc, enc, mem_tracker).unwrap() + get_encoder(desc, enc).unwrap() } fn create_test_decoder( @@ -1333,8 +1317,7 @@ mod tests { fn create_test_dict_encoder(type_len: i32) -> DictEncoder { let desc = create_test_col_desc_ptr(type_len, T::get_physical_type()); - let mem_tracker = Arc::new(MemTracker::new()); - DictEncoder::::new(desc, mem_tracker) + DictEncoder::::new(desc) } fn create_test_dict_decoder() -> DictDecoder { diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index deabbd44034a..c8682e06d391 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -207,7 +207,7 @@ impl LevelDecoder { let num_bytes = ceil((num_buffered_values * bit_width as usize) as i64, 8); let data_size = cmp::min(num_bytes as usize, data.len()); - decoder.reset(data.range(data.start(), data_size)); + decoder.reset(data.range(0, data_size)); data_size } _ => panic!(), diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index 923c45db14d0..0b0c707ff34f 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -17,403 +17,95 @@ //! Utility methods and structs for working with memory. +use bytes::Bytes; use std::{ fmt::{Debug, Display, Formatter, Result as FmtResult}, - io::{Result as IoResult, Write}, - mem, - ops::{Index, IndexMut}, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, Weak, - }, + ops::Index, }; -// ---------------------------------------------------------------------- -// Memory Tracker classes - -/// Reference counted pointer for [`MemTracker`]. -pub type MemTrackerPtr = Arc; -/// Non-owning reference for [`MemTracker`]. -pub type WeakMemTrackerPtr = Weak; - -/// Struct to track memory usage information. -#[derive(Debug)] -pub struct MemTracker { - // In the tuple, the first element is the current memory allocated (in bytes), - // and the second element is the maximum memory allocated so far (in bytes). - current_memory_usage: AtomicI64, - max_memory_usage: AtomicI64, -} - -impl MemTracker { - /// Creates new memory tracker. - #[inline] - pub fn new() -> MemTracker { - MemTracker { - current_memory_usage: Default::default(), - max_memory_usage: Default::default(), - } - } - - /// Returns the current memory consumption, in bytes. - pub fn memory_usage(&self) -> i64 { - self.current_memory_usage.load(Ordering::Acquire) - } - - /// Returns the maximum memory consumption so far, in bytes. - pub fn max_memory_usage(&self) -> i64 { - self.max_memory_usage.load(Ordering::Acquire) - } - - /// Adds `num_bytes` to the memory consumption tracked by this memory tracker. - #[inline] - pub fn alloc(&self, num_bytes: i64) { - let new_current = self - .current_memory_usage - .fetch_add(num_bytes, Ordering::Acquire) - + num_bytes; - self.max_memory_usage - .fetch_max(new_current, Ordering::Acquire); - } -} - -// ---------------------------------------------------------------------- -// Buffer classes - -/// Type alias for [`Buffer`]. -pub type ByteBuffer = Buffer; -/// Type alias for [`BufferPtr`]. -pub type ByteBufferPtr = BufferPtr; - -/// A resize-able buffer class with generic member, with optional memory tracker. -/// -/// Note that a buffer has two attributes: -/// `capacity` and `size`: the former is the total number of space reserved for -/// the buffer, while the latter is the actual number of elements. -/// Invariant: `capacity` >= `size`. -/// The total allocated bytes for a buffer equals to `capacity * sizeof()`. -pub struct Buffer { - data: Vec, - mem_tracker: Option, - type_length: usize, -} - -impl Buffer { - /// Creates new empty buffer. - pub fn new() -> Self { - Buffer { - data: vec![], - mem_tracker: None, - type_length: std::mem::size_of::(), - } - } - - /// Adds [`MemTracker`] for this buffer. - #[inline] - pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self { - mc.alloc((self.data.capacity() * self.type_length) as i64); - self.mem_tracker = Some(mc); - self - } - - /// Returns slice of data in this buffer. - #[inline] - pub fn data(&self) -> &[T] { - self.data.as_slice() - } - - /// Sets data for this buffer. - #[inline] - pub fn set_data(&mut self, new_data: Vec) { - if let Some(ref mc) = self.mem_tracker { - let capacity_diff = new_data.capacity() as i64 - self.data.capacity() as i64; - mc.alloc(capacity_diff * self.type_length as i64); - } - self.data = new_data; - } - - /// Resizes underlying data in place to a new length `new_size`. - /// - /// If `new_size` is less than current length, data is truncated, otherwise, it is - /// extended to `new_size` with provided default value `init_value`. - /// - /// Memory tracker is also updated, if available. - #[inline] - pub fn resize(&mut self, new_size: usize, init_value: T) { - let old_capacity = self.data.capacity(); - self.data.resize(new_size, init_value); - if let Some(ref mc) = self.mem_tracker { - let capacity_diff = self.data.capacity() as i64 - old_capacity as i64; - mc.alloc(capacity_diff * self.type_length as i64); - } - } - - /// Clears underlying data. - #[inline] - pub fn clear(&mut self) { - self.data.clear() - } - - /// Reserves capacity `additional_capacity` for underlying data vector. - /// - /// Memory tracker is also updated, if available. - #[inline] - pub fn reserve(&mut self, additional_capacity: usize) { - let old_capacity = self.data.capacity(); - self.data.reserve(additional_capacity); - if self.data.capacity() > old_capacity { - if let Some(ref mc) = self.mem_tracker { - let capacity_diff = self.data.capacity() as i64 - old_capacity as i64; - mc.alloc(capacity_diff * self.type_length as i64); - } - } - } - - /// Returns [`BufferPtr`] with buffer data. - /// Buffer data is reset. - #[inline] - pub fn consume(&mut self) -> BufferPtr { - let old_data = mem::take(&mut self.data); - let mut result = BufferPtr::new(old_data); - if let Some(ref mc) = self.mem_tracker { - result = result.with_mem_tracker(mc.clone()); - } - result - } - - /// Adds `value` to the buffer. - #[inline] - pub fn push(&mut self, value: T) { - self.data.push(value) - } - - /// Returns current capacity for the buffer. - #[inline] - pub fn capacity(&self) -> usize { - self.data.capacity() - } - - /// Returns current size for the buffer. - #[inline] - pub fn size(&self) -> usize { - self.data.len() - } - - /// Returns `true` if memory tracker is added to buffer, `false` otherwise. - #[inline] - pub fn is_mem_tracked(&self) -> bool { - self.mem_tracker.is_some() - } - - /// Returns memory tracker associated with this buffer. - /// This may panic, if memory tracker is not set, use method above to check if - /// memory tracker is available. - #[inline] - pub fn mem_tracker(&self) -> &MemTrackerPtr { - self.mem_tracker.as_ref().unwrap() - } -} - -impl Index for Buffer { - type Output = T; - - fn index(&self, index: usize) -> &T { - &self.data[index] - } -} - -impl IndexMut for Buffer { - fn index_mut(&mut self, index: usize) -> &mut T { - &mut self.data[index] - } -} - -// TODO: implement this for other types -impl Write for Buffer { - #[inline] - fn write(&mut self, buf: &[u8]) -> IoResult { - let old_capacity = self.data.capacity(); - let bytes_written = self.data.write(buf)?; - if let Some(ref mc) = self.mem_tracker { - if self.data.capacity() - old_capacity > 0 { - mc.alloc((self.data.capacity() - old_capacity) as i64) - } - } - Ok(bytes_written) - } - - fn flush(&mut self) -> IoResult<()> { - // No-op - self.data.flush() - } -} - -impl AsRef<[u8]> for Buffer { - fn as_ref(&self) -> &[u8] { - self.data.as_slice() - } -} - -impl Drop for Buffer { - #[inline] - fn drop(&mut self) { - if let Some(ref mc) = self.mem_tracker { - mc.alloc(-((self.data.capacity() * self.type_length) as i64)); - } - } -} - // ---------------------------------------------------------------------- // Immutable Buffer (BufferPtr) classes /// An representation of a slice on a reference-counting and read-only byte array. /// Sub-slices can be further created from this. The byte array will be released /// when all slices are dropped. +/// +/// TODO: Remove and replace with [`bytes::Bytes`] #[allow(clippy::rc_buffer)] #[derive(Clone, Debug)] -pub struct BufferPtr { - data: Arc>, - start: usize, - len: usize, - // TODO: will this create too many references? rethink about this. - mem_tracker: Option, +pub struct ByteBufferPtr { + data: Bytes, } -impl BufferPtr { +impl ByteBufferPtr { /// Creates new buffer from a vector. - pub fn new(v: Vec) -> Self { - let len = v.len(); - Self { - data: Arc::new(v), - start: 0, - len, - mem_tracker: None, - } + pub fn new(v: Vec) -> Self { + Self { data: v.into() } } /// Returns slice of data in this buffer. #[inline] - pub fn data(&self) -> &[T] { - &self.data[self.start..self.start + self.len] - } - - /// Updates this buffer with new `start` position and length `len`. - /// - /// Range should be within current start position and length. - #[inline] - pub fn with_range(mut self, start: usize, len: usize) -> Self { - self.set_range(start, len); - self - } - - /// Updates this buffer with new `start` position and length `len`. - /// - /// Range should be within current start position and length. - #[inline] - pub fn set_range(&mut self, start: usize, len: usize) { - assert!(self.start <= start && start + len <= self.start + self.len); - self.start = start; - self.len = len; - } - - /// Adds memory tracker to this buffer. - pub fn with_mem_tracker(mut self, mc: MemTrackerPtr) -> Self { - self.mem_tracker = Some(mc); - self - } - - /// Returns start position of this buffer. - #[inline] - pub fn start(&self) -> usize { - self.start - } - - /// Returns the end position of this buffer - #[inline] - pub fn end(&self) -> usize { - self.start + self.len + pub fn data(&self) -> &[u8] { + &self.data } /// Returns length of this buffer #[inline] pub fn len(&self) -> usize { - self.len + self.data.len() } /// Returns whether this buffer is empty #[inline] pub fn is_empty(&self) -> bool { - self.len == 0 - } - - /// Returns `true` if this buffer has memory tracker, `false` otherwise. - pub fn is_mem_tracked(&self) -> bool { - self.mem_tracker.is_some() + self.data.is_empty() } /// Returns a shallow copy of the buffer. /// Reference counted pointer to the data is copied. - pub fn all(&self) -> BufferPtr { - BufferPtr { - data: self.data.clone(), - start: self.start, - len: self.len, - mem_tracker: self.mem_tracker.as_ref().cloned(), - } + pub fn all(&self) -> Self { + self.clone() } /// Returns a shallow copy of the buffer that starts with `start` position. - pub fn start_from(&self, start: usize) -> BufferPtr { - assert!(start <= self.len); - BufferPtr { - data: self.data.clone(), - start: self.start + start, - len: self.len - start, - mem_tracker: self.mem_tracker.as_ref().cloned(), + pub fn start_from(&self, start: usize) -> Self { + Self { + data: self.data.slice(start..), } } /// Returns a shallow copy that is a range slice within this buffer. - pub fn range(&self, start: usize, len: usize) -> BufferPtr { - assert!(start + len <= self.len); - BufferPtr { - data: self.data.clone(), - start: self.start + start, - len, - mem_tracker: self.mem_tracker.as_ref().cloned(), + pub fn range(&self, start: usize, len: usize) -> Self { + Self { + data: self.data.slice(start..start + len), } } } -impl Index for BufferPtr { - type Output = T; +impl Index for ByteBufferPtr { + type Output = u8; - fn index(&self, index: usize) -> &T { - assert!(index < self.len); - &self.data[self.start + index] + fn index(&self, index: usize) -> &u8 { + &self.data[index] } } -impl Display for BufferPtr { +impl Display for ByteBufferPtr { fn fmt(&self, f: &mut Formatter) -> FmtResult { write!(f, "{:?}", self.data) } } -impl Drop for BufferPtr { - fn drop(&mut self) { - if let Some(ref mc) = self.mem_tracker { - if Arc::strong_count(&self.data) == 1 && Arc::weak_count(&self.data) == 0 { - mc.alloc(-(self.data.capacity() as i64)); - } - } +impl AsRef<[u8]> for ByteBufferPtr { + #[inline] + fn as_ref(&self) -> &[u8] { + &self.data } } -impl AsRef<[u8]> for BufferPtr { - #[inline] - fn as_ref(&self) -> &[u8] { - &self.data[self.start..self.start + self.len] +impl From> for ByteBufferPtr { + fn from(data: Vec) -> Self { + Self { data: data.into() } } } @@ -421,128 +113,23 @@ impl AsRef<[u8]> for BufferPtr { mod tests { use super::*; - #[test] - fn test_byte_buffer_mem_tracker() { - let mem_tracker = Arc::new(MemTracker::new()); - - let mut buffer = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); - buffer.set_data(vec![0; 10]); - assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64); - buffer.set_data(vec![0; 20]); - let capacity = buffer.capacity() as i64; - assert_eq!(mem_tracker.memory_usage(), capacity); - - let max_capacity = { - let mut buffer2 = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); - buffer2.reserve(30); - assert_eq!( - mem_tracker.memory_usage(), - buffer2.capacity() as i64 + capacity - ); - buffer2.set_data(vec![0; 100]); - assert_eq!( - mem_tracker.memory_usage(), - buffer2.capacity() as i64 + capacity - ); - buffer2.capacity() as i64 + capacity - }; - - assert_eq!(mem_tracker.memory_usage(), capacity); - assert_eq!(mem_tracker.max_memory_usage(), max_capacity); - - buffer.reserve(40); - assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64); - - buffer.consume(); - assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64); - } - - #[test] - fn test_byte_ptr_mem_tracker() { - let mem_tracker = Arc::new(MemTracker::new()); - - let mut buffer = ByteBuffer::new().with_mem_tracker(mem_tracker.clone()); - buffer.set_data(vec![0; 60]); - - { - let buffer_capacity = buffer.capacity() as i64; - let buf_ptr = buffer.consume(); - assert_eq!(mem_tracker.memory_usage(), buffer_capacity); - { - let buf_ptr1 = buf_ptr.all(); - { - let _ = buf_ptr.start_from(20); - assert_eq!(mem_tracker.memory_usage(), buffer_capacity); - } - assert_eq!(mem_tracker.memory_usage(), buffer_capacity); - let _ = buf_ptr1.range(30, 20); - assert_eq!(mem_tracker.memory_usage(), buffer_capacity); - } - assert_eq!(mem_tracker.memory_usage(), buffer_capacity); - } - assert_eq!(mem_tracker.memory_usage(), buffer.capacity() as i64); - } - - #[test] - fn test_byte_buffer() { - let mut buffer = ByteBuffer::new(); - assert_eq!(buffer.size(), 0); - assert_eq!(buffer.capacity(), 0); - - let mut buffer2 = ByteBuffer::new(); - buffer2.reserve(40); - assert_eq!(buffer2.size(), 0); - assert_eq!(buffer2.capacity(), 40); - - buffer.set_data((0..5).collect()); - assert_eq!(buffer.size(), 5); - assert_eq!(buffer[4], 4); - - buffer.set_data((0..20).collect()); - assert_eq!(buffer.size(), 20); - assert_eq!(buffer[10], 10); - - let expected: Vec = (0..20).collect(); - { - let data = buffer.data(); - assert_eq!(data, expected.as_slice()); - } - - buffer.reserve(40); - assert!(buffer.capacity() >= 40); - - let byte_ptr = buffer.consume(); - assert_eq!(buffer.size(), 0); - assert_eq!(byte_ptr.as_ref(), expected.as_slice()); - - let values: Vec = (0..30).collect(); - let _ = buffer.write(values.as_slice()); - let _ = buffer.flush(); - - assert_eq!(buffer.data(), values.as_slice()); - } - #[test] fn test_byte_ptr() { let values = (0..50).collect(); let ptr = ByteBufferPtr::new(values); assert_eq!(ptr.len(), 50); - assert_eq!(ptr.start(), 0); assert_eq!(ptr[40], 40); let ptr2 = ptr.all(); assert_eq!(ptr2.len(), 50); - assert_eq!(ptr2.start(), 0); assert_eq!(ptr2[40], 40); let ptr3 = ptr.start_from(20); assert_eq!(ptr3.len(), 30); - assert_eq!(ptr3.start(), 20); assert_eq!(ptr3[0], 20); let ptr4 = ptr3.range(10, 10); assert_eq!(ptr4.len(), 10); - assert_eq!(ptr4.start(), 30); assert_eq!(ptr4[0], 30); let expected: Vec = (30..40).collect(); diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 1c0fd6283cde..ffa559f3fecb 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -25,13 +25,10 @@ use crate::encodings::levels::LevelEncoder; use crate::errors::Result; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; use crate::util::memory::ByteBufferPtr; -use crate::util::memory::MemTracker; -use crate::util::memory::MemTrackerPtr; use crate::util::test_common::random_numbers_range; use rand::distributions::uniform::SampleUniform; use std::collections::VecDeque; use std::mem; -use std::sync::Arc; pub trait DataPageBuilder { fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]); @@ -50,7 +47,6 @@ pub trait DataPageBuilder { pub struct DataPageBuilderImpl { desc: ColumnDescPtr, encoding: Option, - mem_tracker: MemTrackerPtr, num_values: u32, buffer: Vec, rep_levels_byte_len: u32, @@ -66,7 +62,6 @@ impl DataPageBuilderImpl { DataPageBuilderImpl { desc, encoding: None, - mem_tracker: Arc::new(MemTracker::new()), num_values, buffer: vec![], rep_levels_byte_len: 0, @@ -122,7 +117,7 @@ impl DataPageBuilder for DataPageBuilderImpl { ); self.encoding = Some(encoding); let mut encoder: Box> = - get_encoder::(self.desc.clone(), encoding, self.mem_tracker.clone()) + get_encoder::(self.desc.clone(), encoding) .expect("get_encoder() should be OK"); encoder.put(values).expect("put() should be OK"); let encoded_values = encoder @@ -252,8 +247,7 @@ pub fn make_pages( let max_def_level = desc.max_def_level(); let max_rep_level = desc.max_rep_level(); - let mem_tracker = Arc::new(MemTracker::new()); - let mut dict_encoder = DictEncoder::::new(desc.clone(), mem_tracker); + let mut dict_encoder = DictEncoder::::new(desc.clone()); for i in 0..num_pages { let mut num_values_cur_page = 0;