Skip to content

Commit

Permalink
Respect Page Size Limits in ArrowWriter (#2853) (#2890)
Browse files Browse the repository at this point in the history
* Respect Page Size Limits in ArrowWriter (#2853)

* Update tests

* Add test required features

* Fix strings

* Review feedback
  • Loading branch information
tustvold authored Oct 24, 2022
1 parent e859f30 commit 7e5d4a1
Show file tree
Hide file tree
Showing 9 changed files with 514 additions and 52 deletions.
4 changes: 4 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ experimental = []
# Enable async APIs
async = ["futures", "tokio"]

[[test]]
name = "arrow_writer_layout"
required-features = ["arrow"]

[[bin]]
name = "parquet-read"
required-features = ["cli"]
Expand Down
17 changes: 7 additions & 10 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,7 @@ impl DictEncoder {

fn estimated_data_page_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
+ RleEncoder::max_buffer_size(bit_width, self.indices.len())
1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
}

fn estimated_dict_page_size(&self) -> usize {
Expand Down Expand Up @@ -427,7 +426,6 @@ impl DictEncoder {
struct ByteArrayEncoder {
fallback: FallbackEncoder,
dict_encoder: Option<DictEncoder>,
num_values: usize,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
}
Expand Down Expand Up @@ -466,7 +464,6 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Ok(Self {
fallback,
dict_encoder: dictionary,
num_values: 0,
min_value: None,
max_value: None,
})
Expand All @@ -487,7 +484,10 @@ impl ColumnValueEncoder for ByteArrayEncoder {
}

fn num_values(&self) -> usize {
self.num_values
match &self.dict_encoder {
Some(encoder) => encoder.indices.len(),
None => self.fallback.num_values,
}
}

fn has_dictionary(&self) -> bool {
Expand All @@ -508,7 +508,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
match self.dict_encoder.take() {
Some(encoder) => {
if self.num_values != 0 {
if !encoder.indices.is_empty() {
return Err(general_err!(
"Must flush data pages before flushing dictionary"
));
Expand Down Expand Up @@ -551,10 +551,7 @@ where

match &mut encoder.dict_encoder {
Some(dict_encoder) => dict_encoder.encode(values, indices),
None => {
encoder.num_values += indices.len();
encoder.fallback.encode(values, indices)
}
None => encoder.fallback.encode(values, indices),
}
}

Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
self.num_values += indices.len();
let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
self.write_slice(&slice)
}
Expand Down
12 changes: 5 additions & 7 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ mod tests {
let page_writer = Box::new(SerializedPageWriter::new(&mut writer));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
.set_data_pagesize_limit(10)
.set_write_batch_size(3) // write 3 values at a time
.build(),
);
Expand All @@ -1846,16 +1846,14 @@ mod tests {
);
let mut res = Vec::new();
while let Some(page) = page_reader.get_next_page().unwrap() {
res.push((page.page_type(), page.num_values()));
res.push((page.page_type(), page.num_values(), page.buffer().len()));
}
assert_eq!(
res,
vec![
(PageType::DICTIONARY_PAGE, 10),
(PageType::DATA_PAGE, 3),
(PageType::DATA_PAGE, 3),
(PageType::DATA_PAGE, 3),
(PageType::DATA_PAGE, 1)
(PageType::DICTIONARY_PAGE, 10, 40),
(PageType::DATA_PAGE, 9, 10),
(PageType::DATA_PAGE, 1, 3),
]
);
}
Expand Down
3 changes: 1 addition & 2 deletions parquet/src/encodings/encoding/dict_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {

fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
+ RleEncoder::max_buffer_size(bit_width, self.indices.len())
RleEncoder::max_buffer_size(bit_width, self.indices.len())
}

fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/encodings/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ mod tests {
// DICTIONARY
// NOTE: The final size is almost the same because the dictionary entries are
// preserved after encoded values have been written.
run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 11, 68, 66);
run_test::<Int32Type>(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0);

// DELTA_BINARY_PACKED
run_test::<Int32Type>(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0);
Expand Down
9 changes: 2 additions & 7 deletions parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ pub fn max_buffer_size(
) -> usize {
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => {
RleEncoder::max_buffer_size(bit_width, num_buffered_values)
+ RleEncoder::min_buffer_size(bit_width)
}
Encoding::BIT_PACKED => {
ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize
}
Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values),
Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 8),
_ => panic!("Unsupported encoding type {}", encoding),
}
}
Expand Down
46 changes: 21 additions & 25 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ use crate::util::{
/// repeated-value := value that is repeated, using a fixed-width of
/// round-up-to-next-byte(bit-width)
/// Maximum groups per bit-packed run. Current value is 64.
/// Maximum groups of 8 values per bit-packed run. Current value is 64.
const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
const MAX_VALUES_PER_BIT_PACKED_RUN: usize = MAX_GROUPS_PER_BIT_PACKED_RUN * 8;

/// A RLE/Bit-Packing hybrid encoder.
// TODO: tracking memory usage
Expand Down Expand Up @@ -99,31 +98,28 @@ impl RleEncoder {
}
}

/// Returns the minimum buffer size needed to use the encoder for `bit_width`.
/// This is the maximum length of a single run for `bit_width`.
pub fn min_buffer_size(bit_width: u8) -> usize {
let max_bit_packed_run_size = 1 + bit_util::ceil(
(MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64,
8,
);
let max_rle_run_size =
bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize;
std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size)
}

/// Returns the maximum buffer size takes to encode `num_values` values with
/// Returns the maximum buffer size to encode `num_values` values with
/// `bit_width`.
pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
// First the maximum size for bit-packed run
let bytes_per_run = bit_width;
let num_runs = bit_util::ceil(num_values as i64, 8) as usize;
let bit_packed_max_size = num_runs + num_runs * bytes_per_run as usize;
// The maximum size occurs with the shortest possible runs of 8
let num_runs = bit_util::ceil(num_values, 8);

// The number of bytes in a run of 8
let bytes_per_run = bit_width as usize;

// The maximum size if stored as shortest possible bit packed runs of 8
let bit_packed_max_size = num_runs + num_runs * bytes_per_run;

// The length of `8` VLQ encoded
let rle_len_prefix = 1;

// The length of an RLE run of 8
let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8);

// The maximum size if stored as shortest possible RLE runs of 8
let rle_max_size = num_runs * min_rle_run_size;

// Second the maximum size for RLE run
let min_rle_run_size = 1 + bit_util::ceil(bit_width as i64, 8) as usize;
let rle_max_size =
bit_util::ceil(num_values as i64, 8) as usize * min_rle_run_size;
std::cmp::max(bit_packed_max_size, rle_max_size) as usize
bit_packed_max_size.max(rle_max_size)
}

/// Encodes `value`, which must be representable with `bit_width` bits.
Expand Down Expand Up @@ -905,8 +901,8 @@ mod tests {
#[test]
fn test_rle_specific_roundtrip() {
let bit_width = 1;
let buffer_len = RleEncoder::min_buffer_size(bit_width);
let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len());
let mut encoder = RleEncoder::new(bit_width, buffer_len);
for v in &values {
encoder.put(*v as u64)
Expand Down
Loading

0 comments on commit 7e5d4a1

Please sign in to comment.