Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncate Min/Max values in the Column Index #4389

Merged
merged 15 commits into from
Jun 11, 2023
Merged
266 changes: 258 additions & 8 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
&[0; 1],
&[0; 1],
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
Expand All @@ -670,8 +670,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Some(stat) => {
self.column_index_builder.append(
null_page,
stat.min_bytes(),
stat.max_bytes(),
self.truncate_min_value(stat.min_bytes()),
self.truncate_max_value(stat.max_bytes()),
self.page_metrics.num_page_nulls as i64,
);
}
Expand All @@ -683,6 +683,35 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}

fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
let max_effective_len =
self.props.minmax_value_truncate_len().unwrap_or(data.len());

match std::str::from_utf8(data) {
Ok(str_data) => truncate_utf8(str_data, max_effective_len),
Err(_) => truncate_binary(data, max_effective_len),
}
}

fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
// Even if the user disables value truncation, we want to make sure to increase the max value so the user doesn't miss it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only increment if we are truncating, I think it would be a bit surprising for users to write b"hello" and get back b"hellp". Whereas writing "really long string" and getting back an obviously truncated string is perhaps more understandable "really long su"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it should be handled correctly now

let max_effective_len =
self.props.minmax_value_truncate_len().unwrap_or(data.len());

match std::str::from_utf8(data) {
Ok(str_data) => {
let mut v = truncate_utf8(str_data, max_effective_len);
increment_utf8(&mut v);
v
}
Err(_) => {
let mut v = truncate_binary(data, max_effective_len);
increment(&mut v);
v
}
}
}

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self) -> Result<()> {
Expand Down Expand Up @@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
(a[1..]) > (b[1..])
}

/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 string, while being less than `max_len` bytes.
fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
let mut max_possible_len = usize::min(data.len(), max_len);
AdamGS marked this conversation as resolved.
Show resolved Hide resolved

if data.is_char_boundary(max_possible_len) {
return data.as_bytes()[0..max_possible_len].to_vec();
}

// UTF8 characters can only be up to 4 bytes long, so this loop has will only run up to 3 times before returning.
loop {
max_possible_len -= 1;
if data.is_char_boundary(max_possible_len) {
return data.as_bytes()[0..max_possible_len].to_vec();
}
}
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
}

/// Truncate a binary slice to make sure its length is less than `max_len`
fn truncate_binary(data: &[u8], max_len: usize) -> Vec<u8> {
data[0..usize::min(data.len(), max_len)].to_vec()
}

/// Try and increment the bytes from right to left.
fn increment(data: &mut [u8]) {
for byte in data.iter_mut().rev() {
if *byte == u8::MAX {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the sequence is 0xFF 0xFF 0xFF 0xFF. I guess we cannot truncate it if that. (Parquet-mr handles this well)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@AdamGS AdamGS Jun 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted their (parquet-mr) general structure with functions returning None when they can't truncate/increment.

continue;
} else {
*byte += 1;
break;
}
}
}

/// Try and increment the the string's bytes from right to left, returning when the result is a valid UTF8 string.
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
fn increment_utf8(data: &mut Vec<u8>) {
for idx in (0..data.len()).rev() {
let byte = &mut data[idx];
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
if *byte == u8::MAX {
continue;
} else {
*byte += 1;
}

if std::str::from_utf8(data).is_ok() {
break;
}
}
}

#[cfg(test)]
mod tests {
use crate::format::BoundaryOrder;
Expand Down Expand Up @@ -2198,10 +2277,11 @@ mod tests {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice());
assert_eq!(
stats.max_bytes(),
column_index.max_values.get(1).unwrap().as_slice()
);

// We expect the max value to be incremented
let mut stats_max_bytes = stats.max_bytes().to_vec();
increment(&mut stats_max_bytes);
assert_eq!(&stats_max_bytes, column_index.max_values.get(1).unwrap());
} else {
panic!("expecting Statistics::Int32");
}
Expand All @@ -2220,12 +2300,182 @@ mod tests {
);
}

/// Verify min/max value truncation in the column index works as expected
#[test]
fn test_column_offset_index_metadata_truncating() {
// write data
// and check the offset index and column index
let page_writer = get_test_page_writer();
let props = Default::default();
let mut writer =
get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![FixedLenByteArray::default(); 3];
// This is the expected min value
data[0].set_data(ByteBufferPtr::new(vec![0_u8; 200]));
// This is the expected max value
data[1].set_data(ByteBufferPtr::new(
String::from("Zorro the masked hero").into_bytes(),
));
data[2].set_data(ByteBufferPtr::new(Vec::from_iter(0_u8..129)));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();
let column_index = r.column_index.unwrap();
let offset_index = r.offset_index.unwrap();

assert_eq!(3, r.rows_written);

// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

if let Some(stats) = r.metadata.statistics() {
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(stats) = stats {
let column_index_min_value = column_index.min_values.get(0).unwrap();
let column_index_max_value = column_index.max_values.get(0).unwrap();

// Column index stats are truncated, while the column chunk's aren't.
assert_ne!(stats.min_bytes(), column_index_min_value.as_slice());
// We expect the max value to be incremented
let mut stats_max_bytes = stats.max_bytes().to_vec();
increment(&mut stats_max_bytes);
assert_eq!(stats_max_bytes, column_index_max_value.as_slice());

assert_eq!(column_index_min_value.len(), 128);
assert_eq!(column_index_min_value.as_slice(), &[0_u8; 128]);
assert_eq!(column_index_max_value.len(), 21);
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_column_offset_index_metadata_truncating_spec_example() {
// write data
// and check the offset index and column index
let page_writer = get_test_page_writer();
let builder = WriterProperties::builder().set_value_truncate_length(Some(1));
let props = Arc::new(builder.build());
let mut writer =
get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![FixedLenByteArray::default(); 1];
// This is the expected min value
data[0].set_data(ByteBufferPtr::new(
String::from("Blart Versenwald III").into_bytes(),
));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();
let column_index = r.column_index.unwrap();
let offset_index = r.offset_index.unwrap();

assert_eq!(1, r.rows_written);

// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

if let Some(stats) = r.metadata.statistics() {
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
let column_index_min_value = column_index.min_values.get(0).unwrap();
let column_index_max_value = column_index.max_values.get(0).unwrap();

assert_ne!(column_index_min_value, column_index_max_value);

assert_eq!(column_index_min_value.len(), 1);
assert_eq!(column_index_max_value.len(), 1);

// Column index stats are truncated, while the column chunk's aren't.
assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
// In this case, they are equal because the max value is shorter than the default threshold
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
} else {
panic!("metadata missing statistics");
}
}

#[test]
fn test_send() {
fn test<T: Send>() {}
test::<ColumnWriterImpl<Int32Type>>();
}

#[test]
fn test_increment() {
let mut v = vec![0, 0, 0];
increment(&mut v);
assert_eq!(&v, &[0, 0, 1]);

// Handle overflow
let mut v = vec![0, 255, 255];
increment(&mut v);
assert_eq!(&v, &[1, 255, 255]);

// No-op if all bytes are u8::MAX
let mut v = vec![255, 255, 255];
increment(&mut v);
assert_eq!(&v, &[255, 255, 255]);
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
}

#[test]
fn test_increment_utf8() {
// Basic ASCII case
let mut v = "hello".as_bytes().to_vec();
increment_utf8(&mut v);
assert_eq!(&v, "hellp".as_bytes());

// Utf8 string
let s = "❤️🧡💛💚💙💜";
let mut v = s.as_bytes().to_vec();
increment_utf8(&mut v);
if let Ok(new) = String::from_utf8(v) {
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
assert_ne!(&new, s);
assert_eq!(new, "❤️🧡💛💚💙💝");
} else {
panic!("Expected incremented UTF8 string to also be valid.")
}
}

#[test]
fn test_truncate_utf8() {
// No-op
let data = "❤️🧡💛💚💙💜";
let r = truncate_utf8(data, data.as_bytes().len());
assert_eq!(r.len(), data.as_bytes().len());
assert_eq!(&r, data.as_bytes());

// We slice it away from the UTF8 boundary
let r = truncate_utf8(data, 13);
assert_eq!(r.len(), 10);
assert_eq!(&r, "❤️🧡".as_bytes())
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,13 +868,13 @@ impl ColumnIndexBuilder {
pub fn append(
&mut self,
null_page: bool,
min_value: &[u8],
max_value: &[u8],
min_value: Vec<u8>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This an API change, which is fine

max_value: Vec<u8>,
null_count: i64,
) {
self.null_pages.push(null_page);
self.min_values.push(min_value.to_vec());
self.max_values.push(max_value.to_vec());
self.min_values.push(min_value);
self.max_values.push(max_value);
self.null_counts.push(null_count);
}

Expand Down
16 changes: 16 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE;
const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
const DEFAULT_COLUMN_INDEX_MINMAX_LEN: Option<usize> = Some(128);
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
const DEFAULT_CREATED_BY: &str =
concat!("parquet-rs version ", env!("CARGO_PKG_VERSION"));
/// default value for the false positive probability used in a bloom filter.
Expand Down Expand Up @@ -128,6 +129,7 @@ pub struct WriterProperties {
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
minmax_value_truncate_len: Option<usize>,
}

impl Default for WriterProperties {
Expand Down Expand Up @@ -226,6 +228,10 @@ impl WriterProperties {
self.sorting_columns.as_ref()
}

pub fn minmax_value_truncate_len(&self) -> Option<usize> {
self.minmax_value_truncate_len
}

/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
Expand Down Expand Up @@ -320,6 +326,7 @@ pub struct WriterPropertiesBuilder {
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
minmax_value_truncate_len: Option<usize>,
}

impl WriterPropertiesBuilder {
Expand All @@ -337,6 +344,7 @@ impl WriterPropertiesBuilder {
default_column_properties: Default::default(),
column_properties: HashMap::new(),
sorting_columns: None,
minmax_value_truncate_len: DEFAULT_COLUMN_INDEX_MINMAX_LEN,
}
}

Expand All @@ -354,6 +362,7 @@ impl WriterPropertiesBuilder {
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
minmax_value_truncate_len: self.minmax_value_truncate_len,
}
}

Expand Down Expand Up @@ -626,6 +635,13 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}

/// Sets the max length of min/max value fields in the column index.
/// If set to `None` - there's no effective limit.
pub fn set_value_truncate_length(mut self, max_length: Option<usize>) -> Self {
AdamGS marked this conversation as resolved.
Show resolved Hide resolved
self.minmax_value_truncate_len = max_length;
self
}
}

/// Controls the level of statistics to be computed by the writer
Expand Down
Loading