-
Notifications
You must be signed in to change notification settings - Fork 861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Truncate Min/Max values in the Column Index #4389
Changes from 5 commits
ef1b6b2
00bcfb5
314c2ac
23d16ee
a089918
1c68fb5
6a56a51
0c0520a
f07cbca
6abc63c
84cbb32
59020a8
93c08f7
13431b4
430c612
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -656,8 +656,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
if null_page && self.column_index_builder.valid() { | ||
self.column_index_builder.append( | ||
null_page, | ||
&[0; 1], | ||
&[0; 1], | ||
vec![0; 1], | ||
vec![0; 1], | ||
self.page_metrics.num_page_nulls as i64, | ||
); | ||
} else if self.column_index_builder.valid() { | ||
|
@@ -670,8 +670,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
Some(stat) => { | ||
self.column_index_builder.append( | ||
null_page, | ||
stat.min_bytes(), | ||
stat.max_bytes(), | ||
self.truncate_min_value(stat.min_bytes()), | ||
self.truncate_max_value(stat.max_bytes()), | ||
self.page_metrics.num_page_nulls as i64, | ||
); | ||
} | ||
|
@@ -683,6 +683,35 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { | |
.append_row_count(self.page_metrics.num_buffered_rows as i64); | ||
} | ||
|
||
fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> { | ||
let max_effective_len = | ||
self.props.minmax_value_truncate_len().unwrap_or(data.len()); | ||
|
||
match std::str::from_utf8(data) { | ||
Ok(str_data) => truncate_utf8(str_data, max_effective_len), | ||
Err(_) => truncate_binary(data, max_effective_len), | ||
} | ||
} | ||
|
||
fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> { | ||
// Even if the user disables value truncation, we want to make sure to increase the max value so the user doesn't miss it. | ||
let max_effective_len = | ||
self.props.minmax_value_truncate_len().unwrap_or(data.len()); | ||
|
||
match std::str::from_utf8(data) { | ||
Ok(str_data) => { | ||
let mut v = truncate_utf8(str_data, max_effective_len); | ||
increment_utf8(&mut v); | ||
v | ||
} | ||
Err(_) => { | ||
let mut v = truncate_binary(data, max_effective_len); | ||
increment(&mut v); | ||
v | ||
} | ||
} | ||
} | ||
|
||
/// Adds data page. | ||
/// Data page is either buffered in case of dictionary encoding or written directly. | ||
fn add_data_page(&mut self) -> Result<()> { | ||
|
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool { | |
(a[1..]) > (b[1..]) | ||
} | ||
|
||
/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 string, while being less than `max_len` bytes. | ||
fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> { | ||
let mut max_possible_len = usize::min(data.len(), max_len); | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if data.is_char_boundary(max_possible_len) { | ||
return data.as_bytes()[0..max_possible_len].to_vec(); | ||
} | ||
|
||
// UTF8 characters can only be up to 4 bytes long, so this loop has will only run up to 3 times before returning. | ||
loop { | ||
max_possible_len -= 1; | ||
if data.is_char_boundary(max_possible_len) { | ||
return data.as_bytes()[0..max_possible_len].to_vec(); | ||
} | ||
} | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/// Truncate a binary slice to make sure its length is less than `max_len` | ||
fn truncate_binary(data: &[u8], max_len: usize) -> Vec<u8> { | ||
data[0..usize::min(data.len(), max_len)].to_vec() | ||
} | ||
|
||
/// Try and increment the bytes from right to left. | ||
fn increment(data: &mut [u8]) { | ||
for byte in data.iter_mut().rev() { | ||
if *byte == u8::MAX { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the sequence is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is still outstanding, right? Is the solution to just return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adopted their (parquet-mr) general structure with functions returning None when they can't truncate/increment. |
||
continue; | ||
} else { | ||
*byte += 1; | ||
break; | ||
} | ||
} | ||
} | ||
|
||
/// Try and increment the the string's bytes from right to left, returning when the result is a valid UTF8 string. | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn increment_utf8(data: &mut Vec<u8>) { | ||
for idx in (0..data.len()).rev() { | ||
let byte = &mut data[idx]; | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if *byte == u8::MAX { | ||
continue; | ||
} else { | ||
*byte += 1; | ||
} | ||
|
||
if std::str::from_utf8(data).is_ok() { | ||
break; | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use crate::format::BoundaryOrder; | ||
|
@@ -2198,10 +2277,11 @@ mod tests { | |
// first page is [1,2,3,4] | ||
// second page is [-5,2,4,8] | ||
assert_eq!(stats.min_bytes(), column_index.min_values[1].as_slice()); | ||
assert_eq!( | ||
stats.max_bytes(), | ||
column_index.max_values.get(1).unwrap().as_slice() | ||
); | ||
|
||
// We expect the max value to be incremented | ||
let mut stats_max_bytes = stats.max_bytes().to_vec(); | ||
increment(&mut stats_max_bytes); | ||
assert_eq!(&stats_max_bytes, column_index.max_values.get(1).unwrap()); | ||
} else { | ||
panic!("expecting Statistics::Int32"); | ||
} | ||
|
@@ -2220,12 +2300,182 @@ mod tests { | |
); | ||
} | ||
|
||
/// Verify min/max value truncation in the column index works as expected | ||
#[test] | ||
fn test_column_offset_index_metadata_truncating() { | ||
// write data | ||
// and check the offset index and column index | ||
let page_writer = get_test_page_writer(); | ||
let props = Default::default(); | ||
let mut writer = | ||
get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); | ||
|
||
let mut data = vec![FixedLenByteArray::default(); 3]; | ||
// This is the expected min value | ||
data[0].set_data(ByteBufferPtr::new(vec![0_u8; 200])); | ||
// This is the expected max value | ||
data[1].set_data(ByteBufferPtr::new( | ||
String::from("Zorro the masked hero").into_bytes(), | ||
)); | ||
data[2].set_data(ByteBufferPtr::new(Vec::from_iter(0_u8..129))); | ||
|
||
writer.write_batch(&data, None, None).unwrap(); | ||
|
||
writer.flush_data_pages().unwrap(); | ||
|
||
let r = writer.close().unwrap(); | ||
let column_index = r.column_index.unwrap(); | ||
let offset_index = r.offset_index.unwrap(); | ||
|
||
assert_eq!(3, r.rows_written); | ||
|
||
// column index | ||
assert_eq!(1, column_index.null_pages.len()); | ||
assert_eq!(1, offset_index.page_locations.len()); | ||
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); | ||
assert!(!column_index.null_pages[0]); | ||
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); | ||
|
||
if let Some(stats) = r.metadata.statistics() { | ||
assert!(stats.has_min_max_set()); | ||
assert_eq!(stats.null_count(), 0); | ||
assert_eq!(stats.distinct_count(), None); | ||
if let Statistics::FixedLenByteArray(stats) = stats { | ||
let column_index_min_value = column_index.min_values.get(0).unwrap(); | ||
let column_index_max_value = column_index.max_values.get(0).unwrap(); | ||
|
||
// Column index stats are truncated, while the column chunk's aren't. | ||
assert_ne!(stats.min_bytes(), column_index_min_value.as_slice()); | ||
// We expect the max value to be incremented | ||
let mut stats_max_bytes = stats.max_bytes().to_vec(); | ||
increment(&mut stats_max_bytes); | ||
assert_eq!(stats_max_bytes, column_index_max_value.as_slice()); | ||
|
||
assert_eq!(column_index_min_value.len(), 128); | ||
assert_eq!(column_index_min_value.as_slice(), &[0_u8; 128]); | ||
assert_eq!(column_index_max_value.len(), 21); | ||
} else { | ||
panic!("expecting Statistics::FixedLenByteArray"); | ||
} | ||
} else { | ||
panic!("metadata missing statistics"); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_column_offset_index_metadata_truncating_spec_example() { | ||
// write data | ||
// and check the offset index and column index | ||
let page_writer = get_test_page_writer(); | ||
let builder = WriterProperties::builder().set_value_truncate_length(Some(1)); | ||
let props = Arc::new(builder.build()); | ||
let mut writer = | ||
get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props); | ||
|
||
let mut data = vec![FixedLenByteArray::default(); 1]; | ||
// This is the expected min value | ||
data[0].set_data(ByteBufferPtr::new( | ||
String::from("Blart Versenwald III").into_bytes(), | ||
)); | ||
|
||
writer.write_batch(&data, None, None).unwrap(); | ||
|
||
writer.flush_data_pages().unwrap(); | ||
|
||
let r = writer.close().unwrap(); | ||
let column_index = r.column_index.unwrap(); | ||
let offset_index = r.offset_index.unwrap(); | ||
|
||
assert_eq!(1, r.rows_written); | ||
|
||
// column index | ||
assert_eq!(1, column_index.null_pages.len()); | ||
assert_eq!(1, offset_index.page_locations.len()); | ||
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); | ||
assert!(!column_index.null_pages[0]); | ||
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); | ||
|
||
if let Some(stats) = r.metadata.statistics() { | ||
assert!(stats.has_min_max_set()); | ||
assert_eq!(stats.null_count(), 0); | ||
assert_eq!(stats.distinct_count(), None); | ||
if let Statistics::FixedLenByteArray(_stats) = stats { | ||
let column_index_min_value = column_index.min_values.get(0).unwrap(); | ||
let column_index_max_value = column_index.max_values.get(0).unwrap(); | ||
|
||
assert_ne!(column_index_min_value, column_index_max_value); | ||
|
||
assert_eq!(column_index_min_value.len(), 1); | ||
assert_eq!(column_index_max_value.len(), 1); | ||
|
||
// Column index stats are truncated, while the column chunk's aren't. | ||
assert_eq!("B".as_bytes(), column_index_min_value.as_slice()); | ||
// In this case, they are equal because the max value is shorter than the default threshold | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert_eq!("C".as_bytes(), column_index_max_value.as_slice()); | ||
} else { | ||
panic!("expecting Statistics::FixedLenByteArray"); | ||
} | ||
} else { | ||
panic!("metadata missing statistics"); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_send() { | ||
fn test<T: Send>() {} | ||
test::<ColumnWriterImpl<Int32Type>>(); | ||
} | ||
|
||
#[test] | ||
fn test_increment() { | ||
let mut v = vec![0, 0, 0]; | ||
increment(&mut v); | ||
assert_eq!(&v, &[0, 0, 1]); | ||
|
||
// Handle overflow | ||
let mut v = vec![0, 255, 255]; | ||
increment(&mut v); | ||
assert_eq!(&v, &[1, 255, 255]); | ||
|
||
// No-op if all bytes are u8::MAX | ||
let mut v = vec![255, 255, 255]; | ||
increment(&mut v); | ||
assert_eq!(&v, &[255, 255, 255]); | ||
wjones127 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
#[test] | ||
fn test_increment_utf8() { | ||
// Basic ASCII case | ||
let mut v = "hello".as_bytes().to_vec(); | ||
increment_utf8(&mut v); | ||
assert_eq!(&v, "hellp".as_bytes()); | ||
|
||
// Utf8 string | ||
let s = "❤️🧡💛💚💙💜"; | ||
let mut v = s.as_bytes().to_vec(); | ||
increment_utf8(&mut v); | ||
if let Ok(new) = String::from_utf8(v) { | ||
AdamGS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assert_ne!(&new, s); | ||
assert_eq!(new, "❤️🧡💛💚💙💝"); | ||
} else { | ||
panic!("Expected incremented UTF8 string to also be valid.") | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_truncate_utf8() { | ||
// No-op | ||
let data = "❤️🧡💛💚💙💜"; | ||
let r = truncate_utf8(data, data.as_bytes().len()); | ||
assert_eq!(r.len(), data.as_bytes().len()); | ||
assert_eq!(&r, data.as_bytes()); | ||
|
||
// We slice it away from the UTF8 boundary | ||
let r = truncate_utf8(data, 13); | ||
assert_eq!(r.len(), 10); | ||
assert_eq!(&r, "❤️🧡".as_bytes()) | ||
} | ||
|
||
/// Performs write-read roundtrip with randomly generated values and levels. | ||
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write | ||
/// for a column. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -868,13 +868,13 @@ impl ColumnIndexBuilder { | |
pub fn append( | ||
&mut self, | ||
null_page: bool, | ||
min_value: &[u8], | ||
max_value: &[u8], | ||
min_value: Vec<u8>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This an API change, which is fine |
||
max_value: Vec<u8>, | ||
null_count: i64, | ||
) { | ||
self.null_pages.push(null_page); | ||
self.min_values.push(min_value.to_vec()); | ||
self.max_values.push(max_value.to_vec()); | ||
self.min_values.push(min_value); | ||
self.max_values.push(max_value); | ||
self.null_counts.push(null_count); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should only increment if we are truncating, I think it would be a bit surprising for users to write
b"hello"
and get backb"hellp"
. Whereas writing"really long string"
and getting back an obviously truncated string is perhaps more understandable"really long su"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it should be handled correctly now