From 41b8f66a0c9ee3037106fff37c283c6cb228c75f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 19 Jul 2024 13:45:56 -0700 Subject: [PATCH 1/2] deprecate read_page_locations --- parquet/src/arrow/arrow_reader/mod.rs | 4 +- parquet/src/arrow/arrow_reader/statistics.rs | 14 +++-- parquet/src/arrow/arrow_writer/mod.rs | 14 ++--- parquet/src/arrow/async_reader/metadata.rs | 8 +-- parquet/src/arrow/async_reader/mod.rs | 24 +++++---- parquet/src/bin/parquet-index.rs | 12 +++-- parquet/src/file/metadata/memory.rs | 7 +++ parquet/src/file/metadata/mod.rs | 54 ++++++++----------- parquet/src/file/page_index/index_reader.rs | 1 + parquet/src/file/serialized_reader.rs | 56 ++++++++------------ parquet/src/file/writer.rs | 18 +++---- parquet/tests/arrow_writer_layout.rs | 10 ++-- 12 files changed, 109 insertions(+), 113 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e9edf7cb751d..22a4e5a90aa3 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -394,7 +394,7 @@ impl ArrowReaderMetadata { let offset_index = metadata .row_groups() .iter() - .map(|rg| index_reader::read_pages_locations(reader, rg.columns())) + .map(|rg| index_reader::read_offset_indexes(reader, rg.columns())) .collect::>>()?; metadata.set_offset_index(Some(offset_index)) @@ -689,7 +689,7 @@ impl Iterator for ReaderPageIterator { // To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out empty `i[rg_idx]`. let page_locations = offset_index .filter(|i| !i[rg_idx].is_empty()) - .map(|i| i[rg_idx][self.column_idx].clone()); + .map(|i| i[rg_idx][self.column_idx].page_locations.clone()); let total_rows = rg.num_rows() as usize; let reader = self.reader.clone(); diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index d536792b827b..6e2a6fafca25 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> { let iter = row_group_indices.into_iter().map(|rg_index| { let column_page_index_per_row_group_per_column = &column_page_index[*rg_index][parquet_index]; - let num_data_pages = &column_offset_index[*rg_index][parquet_index].len(); + let num_data_pages = &column_offset_index[*rg_index][parquet_index] + .page_locations() + .len(); (*num_data_pages, column_page_index_per_row_group_per_column) }); @@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> { let mut row_count_total = Vec::new(); for rg_idx in row_group_indices { - let page_locations = &column_offset_index[*rg_idx][parquet_index]; + let page_locations = &column_offset_index[*rg_idx][parquet_index].page_locations(); let row_count_per_page = page_locations .windows(2) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 070d740094f5..8f7b514ccf71 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1096,7 +1096,7 @@ mod tests { use crate::data_type::AsBytes; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; - use crate::file::page_index::index_reader::read_pages_locations; + use crate::file::page_index::index_reader::read_offset_indexes; use crate::file::properties::{ BloomFilterPosition, EnabledStatistics, ReaderProperties, WriterVersion, }; @@ -1669,16 +1669,16 @@ mod tests { "Expected a dictionary page" ); - let page_locations = read_pages_locations(&file, column).unwrap(); + let offset_indexes = read_offset_indexes(&file, column).unwrap(); - let offset_index = page_locations[0].clone(); + let page_locations = offset_indexes[0].page_locations.clone(); // We should fallback to PLAIN encoding after the first row and our max page size is 1 bytes // so we expect one dictionary encoded page and then a page per row thereafter. assert_eq!( - offset_index.len(), + page_locations.len(), 10, - "Expected 9 pages but got {offset_index:#?}" + "Expected 9 pages but got {page_locations:#?}" ); } @@ -3020,8 +3020,8 @@ mod tests { assert_eq!(index.len(), 1); assert_eq!(index[0].len(), 2); // 2 columns - assert_eq!(index[0][0].len(), 1); // 1 page - assert_eq!(index[0][1].len(), 1); // 1 page + assert_eq!(index[0][0].page_locations().len(), 1); // 1 page + assert_eq!(index[0][1].page_locations().len(), 1); // 1 page } #[test] diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 4a3489a2084d..9224ea3f68a8 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; -use crate::file::page_index::index_reader::{ - acc_range, decode_column_index, decode_page_locations, -}; +use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; @@ -179,9 +177,7 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => { - decode_page_locations(&data[r.start - offset..r.end - offset]) - } + Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]), None => Err(general_err!("missing offset index")), }) .collect::>>() diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5a790fa6aff0..5695dbc10fe1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::FOOTER_SIZE; -use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation}; +use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash}; mod metadata; pub use metadata::*; @@ -489,7 +490,7 @@ where // TODO: calling build_array multiple times is wasteful let meta = self.metadata.row_group(row_group_idx); - let page_locations = self + let offset_index = self .metadata .offset_index() .map(|x| x[row_group_idx].as_slice()); @@ -499,7 +500,7 @@ where // schema: meta.schema_descr_ptr(), row_count: meta.num_rows() as usize, column_chunks: vec![None; meta.columns().len()], - page_locations, + offset_index, }; if let Some(filter) = self.filter.as_mut() { @@ -703,7 +704,7 @@ where /// An in-memory collection of column chunks struct InMemoryRowGroup<'a> { metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, column_chunks: Vec>>, row_count: usize, } @@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> { projection: &ProjectionMask, selection: Option<&RowSelection>, ) -> Result<()> { - if let Some((selection, page_locations)) = selection.zip(self.page_locations) { + if let Some((selection, offset_index)) = selection.zip(self.offset_index) { // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the // `RowSelection` let mut page_start_offsets: Vec> = vec![]; @@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> { // then we need to also fetch a dictionary page. let mut ranges = vec![]; let (start, _len) = chunk_meta.byte_range(); - match page_locations[idx].first() { + match offset_index[idx].page_locations.first() { Some(first) if first.offset as u64 != start => { ranges.push(start as usize..first.offset as usize); } _ => (), } - ranges.extend(selection.scan_ranges(&page_locations[idx])); + ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations)); page_start_offsets.push(ranges.iter().map(|range| range.start).collect()); ranges @@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> { "Invalid column index {i}, column was not fetched" ))), Some(data) => { - let page_locations = self.page_locations.map(|index| index[i].clone()); + let page_locations = self + .offset_index + .map(|index| index[i].page_locations.clone()); let page_reader: Box = Box::new(SerializedPageReader::new( data.clone(), self.metadata.column(i), @@ -1529,7 +1532,7 @@ mod tests { let metadata = parse_metadata(&data).unwrap(); let offset_index = - index_reader::read_pages_locations(&data, metadata.row_group(0).columns()) + index_reader::read_offset_indexes(&data, metadata.row_group(0).columns()) .expect("reading offset index"); let row_group_meta = metadata.row_group(0).clone(); @@ -1538,7 +1541,6 @@ mod tests { vec![row_group_meta], None, Some(vec![offset_index.clone()]), - None, ); let metadata = Arc::new(metadata); @@ -1575,7 +1577,7 @@ mod tests { }; let mut skip = true; - let mut pages = offset_index[0].iter().peekable(); + let mut pages = offset_index[0].page_locations.iter().peekable(); // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs index 86e08b6dafa3..1a9b74dd78fb 100644 --- a/parquet/src/bin/parquet-index.rs +++ b/parquet/src/bin/parquet-index.rs @@ -37,6 +37,7 @@ use clap::Parser; use parquet::errors::{ParquetError, Result}; use parquet::file::page_index::index::{Index, PageIndex}; +use parquet::file::page_index::offset_index::OffsetIndexMetaData; use parquet::file::reader::{FileReader, SerializedFileReader}; use parquet::file::serialized_reader::ReadOptionsBuilder; use parquet::format::PageLocation; @@ -93,7 +94,8 @@ impl Args { )) })?; - let row_counts = compute_row_counts(offset_index, row_group.num_rows()); + let row_counts = + compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows()); match &column_indices[column_idx] { Index::NONE => println!("NO INDEX"), Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?, @@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec { /// Prints index information for a single column chunk fn print_index( column_index: &[PageIndex], - offset_index: &[PageLocation], + offset_index: &OffsetIndexMetaData, row_counts: &[i64], ) -> Result<()> { - if column_index.len() != offset_index.len() { + if column_index.len() != offset_index.page_locations.len() { return Err(ParquetError::General(format!( "Index length mismatch, got {} and {}", column_index.len(), - offset_index.len() + offset_index.page_locations.len() ))); } for (idx, ((c, o), row_count)) in column_index .iter() - .zip(offset_index) + .zip(offset_index.page_locations()) .zip(row_counts) .enumerate() { diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index a1b40a8de95a..a7d3d4ab8f93 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType; use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData}; use crate::file::page_encoding_stats::PageEncodingStats; use crate::file::page_index::index::{Index, NativeIndex, PageIndex}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{Statistics, ValueStatistics}; use crate::format::{BoundaryOrder, PageLocation, SortingColumn}; use std::sync::Arc; @@ -144,6 +145,12 @@ impl HeapSize for Statistics { } } +impl HeapSize for OffsetIndexMetaData { + fn heap_size(&self) -> usize { + self.page_locations.heap_size() + self.unencoded_byte_array_data_bytes.heap_size() + } +} + impl HeapSize for Index { fn heap_size(&self) -> usize { match self { diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index d86b1ce572fb..9a3ca605857f 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -44,6 +44,7 @@ use crate::errors::{ParquetError, Result}; pub(crate) use crate::file::metadata::memory::HeapSize; use crate::file::page_encoding_stats::{self, PageEncodingStats}; use crate::file::page_index::index::Index; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::statistics::{self, Statistics}; use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, SchemaDescriptor, @@ -56,20 +57,19 @@ use crate::schema::types::{ /// [`Index`] corresponding to column `column_number` of row group /// `row_group_number`. /// -/// For example `column_index[2][3]` holds the [`Index`] for the forth +/// For example `column_index[2][3]` holds the [`Index`] for the fourth /// column in the third row group of the parquet file. pub type ParquetColumnIndex = Vec>; -/// [`PageLocation`] for each data page of each row group of each column. +/// [`OffsetIndexMetaData`] for each row group of each column. /// -/// `offset_index[row_group_number][column_number][page_number]` holds -/// the [`PageLocation`] corresponding to page `page_number` of column +/// `offset_index[row_group_number][column_number]` holds +/// the [`OffsetIndexMetaData`] corresponding to column /// `column_number`of row group `row_group_number`. /// -/// For example `offset_index[2][3][4]` holds the [`PageLocation`] for -/// the fifth page of the forth column in the third row group of the -/// parquet file. -pub type ParquetOffsetIndex = Vec>>; +/// For example `offset_index[2][3]` holds the [`OffsetIndexMetaData`] for +/// the fourth column in the third row group of the parquet file. +pub type ParquetOffsetIndex = Vec>; /// Parsed metadata for a single Parquet file /// @@ -94,10 +94,8 @@ pub struct ParquetMetaData { row_groups: Vec, /// Page level index for each page in each column chunk column_index: Option, - /// Offset index for all each page in each column chunk + /// Offset index for each page in each column chunk offset_index: Option, - /// `unencoded_byte_array_data_bytes` from the offset index - unencoded_byte_array_data_bytes: Option>>>>, } impl ParquetMetaData { @@ -109,7 +107,6 @@ impl ParquetMetaData { row_groups, column_index: None, offset_index: None, - unencoded_byte_array_data_bytes: None, } } @@ -120,14 +117,12 @@ impl ParquetMetaData { row_groups: Vec, column_index: Option, offset_index: Option, - unencoded_byte_array_data_bytes: Option>>>>, ) -> Self { ParquetMetaData { file_metadata, row_groups, column_index, offset_index, - unencoded_byte_array_data_bytes, } } @@ -184,19 +179,6 @@ impl ParquetMetaData { self.offset_index.as_ref() } - /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in this file, if loaded - /// - /// This value represents the output size of the total bytes in this file, which can be useful for - /// allocating an appropriately sized output buffer. - /// - /// Returns `None` if the parquet file does not have a `OffsetIndex` or - /// [ArrowReaderOptions::with_page_index] was set to false. - /// - /// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index - pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec>>>> { - self.unencoded_byte_array_data_bytes.as_ref() - } - /// Estimate of the bytes allocated to store `ParquetMetadata` /// /// # Notes: @@ -217,7 +199,6 @@ impl ParquetMetaData { + self.row_groups.heap_size() + self.column_index.heap_size() + self.offset_index.heap_size() - + self.unencoded_byte_array_data_bytes.heap_size() } /// Override the column index @@ -1364,7 +1345,7 @@ mod tests { column_orders, ); let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta.clone()); - let base_expected_size = 1376; + let base_expected_size = 1352; assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(); @@ -1373,18 +1354,25 @@ mod tests { let native_index = NativeIndex::::try_new(column_index).unwrap(); // Now, add in OffsetIndex + let mut offset_index = OffsetIndexBuilder::new(); + offset_index.append_row_count(1); + offset_index.append_offset_and_size(2, 3); + offset_index.append_unencoded_byte_array_data_bytes(Some(10)); + offset_index.append_row_count(1); + offset_index.append_offset_and_size(2, 3); + offset_index.append_unencoded_byte_array_data_bytes(Some(10)); + let offset_index = offset_index.build_to_thrift(); + let parquet_meta = ParquetMetaData::new_with_page_index( file_metadata, row_group_meta, Some(vec![vec![Index::BOOLEAN(native_index)]]), Some(vec![vec![ - vec![PageLocation::new(1, 2, 3)], - vec![PageLocation::new(1, 2, 3)], + OffsetIndexMetaData::try_new(offset_index).unwrap() ]]), - Some(vec![vec![Some(vec![10, 20, 30])]]), ); - let bigger_expected_size = 2464; + let bigger_expected_size = 2400; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 7959cb95c052..395e9afe122c 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -85,6 +85,7 @@ pub fn read_columns_indexes( /// See [Page Index Documentation] for more details. /// /// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")] pub fn read_pages_locations( reader: &R, chunks: &[ColumnChunkMetaData], diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 65b6ebf2ec98..70aea6fd5ad3 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -28,6 +28,7 @@ use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::page_index::index_reader; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::{ footer, metadata::*, @@ -211,22 +212,12 @@ impl SerializedFileReader { if options.enable_page_index { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; - let mut unenc_byte_sizes = vec![]; for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; - - // split offset_index into two vectors to not break API - let mut page_locations = vec![]; - let mut unenc_bytes = vec![]; - offset_index.into_iter().for_each(|index| { - page_locations.push(index.page_locations); - unenc_bytes.push(index.unencoded_byte_array_data_bytes); - }); columns_indexes.push(column_index); - offset_indexes.push(page_locations); - unenc_byte_sizes.push(unenc_bytes); + offset_indexes.push(offset_index); } Ok(Self { @@ -236,7 +227,6 @@ impl SerializedFileReader { filtered_row_groups, Some(columns_indexes), Some(offset_indexes), - Some(unenc_byte_sizes), )), props: Arc::new(options.props), }) @@ -296,7 +286,7 @@ impl FileReader for SerializedFileReader { pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, props: ReaderPropertiesPtr, bloom_filters: Vec>, } @@ -306,7 +296,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { pub fn new( chunk_reader: Arc, metadata: &'a RowGroupMetaData, - page_locations: Option<&'a [Vec]>, + offset_index: Option<&'a [OffsetIndexMetaData]>, props: ReaderPropertiesPtr, ) -> Result { let bloom_filters = if props.read_bloom_filter() { @@ -321,7 +311,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { Ok(Self { chunk_reader, metadata, - page_locations, + offset_index, props, bloom_filters, }) @@ -341,7 +331,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); - let page_locations = self.page_locations.map(|x| x[i].clone()); + let page_locations = self.offset_index.map(|x| x[i].page_locations.clone()); let props = Arc::clone(&self.props); Ok(Box::new(SerializedPageReader::new_with_properties( @@ -787,7 +777,7 @@ mod tests { use crate::data_type::private::ParquetValueType; use crate::data_type::{AsBytes, FixedLenByteArrayType}; use crate::file::page_index::index::{Index, NativeIndex}; - use crate::file::page_index::index_reader::{read_columns_indexes, read_pages_locations}; + use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes}; use crate::file::writer::SerializedFileWriter; use crate::record::RowAccessor; use crate::schema::parser::parse_message_type; @@ -1325,7 +1315,7 @@ mod tests { // only one row group assert_eq!(offset_indexes.len(), 1); let offset_index = &offset_indexes[0]; - let page_offset = &offset_index[0][0]; + let page_offset = &offset_index[0].page_locations()[0]; assert_eq!(4, page_offset.offset); assert_eq!(152, page_offset.compressed_page_size); @@ -1348,8 +1338,8 @@ mod tests { b.reverse(); assert_eq!(a, b); - let a = read_pages_locations(&test_file, columns).unwrap(); - let mut b = read_pages_locations(&test_file, &reversed).unwrap(); + let a = read_offset_indexes(&test_file, columns).unwrap(); + let mut b = read_offset_indexes(&test_file, &reversed).unwrap(); b.reverse(); assert_eq!(a, b); } @@ -1386,7 +1376,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 0), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[0].len(), 325); + assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325); } else { unreachable!() }; @@ -1394,7 +1384,7 @@ mod tests { assert!(&column_index[0][1].is_sorted()); if let Index::BOOLEAN(index) = &column_index[0][1] { assert_eq!(index.indexes.len(), 82); - assert_eq!(row_group_offset_indexes[1].len(), 82); + assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82); } else { unreachable!() }; @@ -1407,7 +1397,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 2), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[2].len(), 325); + assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325); } else { unreachable!() }; @@ -1420,7 +1410,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 3), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[3].len(), 325); + assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325); } else { unreachable!() }; @@ -1433,7 +1423,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 4), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[4].len(), 325); + assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325); } else { unreachable!() }; @@ -1446,7 +1436,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 5), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[5].len(), 528); + assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528); } else { unreachable!() }; @@ -1459,7 +1449,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 6), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[6].len(), 325); + assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325); } else { unreachable!() }; @@ -1472,7 +1462,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 7), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[7].len(), 528); + assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528); } else { unreachable!() }; @@ -1485,7 +1475,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 8), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[8].len(), 974); + assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974); } else { unreachable!() }; @@ -1498,7 +1488,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 9), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[9].len(), 352); + assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352); } else { unreachable!() }; @@ -1506,7 +1496,7 @@ mod tests { //Notice: min_max values for each page for this col not exits. assert!(!&column_index[0][10].is_sorted()); if let Index::NONE = &column_index[0][10] { - assert_eq!(row_group_offset_indexes[10].len(), 974); + assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974); } else { unreachable!() }; @@ -1519,7 +1509,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 11), BoundaryOrder::ASCENDING, ); - assert_eq!(row_group_offset_indexes[11].len(), 325); + assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325); } else { unreachable!() }; @@ -1532,7 +1522,7 @@ mod tests { get_row_group_min_max_bytes(row_group_metadata, 12), BoundaryOrder::UNORDERED, ); - assert_eq!(row_group_offset_indexes[12].len(), 325); + assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325); } else { unreachable!() }; diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index b109a2da8eb0..f7dde59f4f42 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1920,15 +1920,15 @@ mod tests { column.unencoded_byte_array_data_bytes().unwrap() ); - assert!(reader - .metadata() - .unencoded_byte_array_data_bytes() - .is_some()); - let unenc_sizes = reader.metadata().unencoded_byte_array_data_bytes().unwrap(); - assert_eq!(unenc_sizes.len(), 1); - assert_eq!(unenc_sizes[0].len(), 1); - assert!(unenc_sizes[0][0].is_some()); - let page_sizes = unenc_sizes[0][0].as_ref().unwrap(); + assert!(reader.metadata().offset_index().is_some()); + let offset_index = reader.metadata().offset_index().unwrap(); + assert_eq!(offset_index.len(), 1); + assert_eq!(offset_index[0].len(), 1); + assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some()); + let page_sizes = offset_index[0][0] + .unencoded_byte_array_data_bytes + .as_ref() + .unwrap(); assert_eq!(page_sizes.len(), 1); assert_eq!(page_sizes[0], unenc_size); } diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index cd124031cfdc..3e0f6ce3a8b3 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -89,12 +89,15 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { for (column_index, column_layout) in offset_index.iter().zip(&row_group_layout.columns) { assert_eq!( - column_index.len(), + column_index.page_locations.len(), column_layout.pages.len(), "index page count mismatch" ); - for (idx, (page, page_layout)) in - column_index.iter().zip(&column_layout.pages).enumerate() + for (idx, (page, page_layout)) in column_index + .page_locations + .iter() + .zip(&column_layout.pages) + .enumerate() { assert_eq!( page.compressed_page_size as usize, @@ -102,6 +105,7 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { "index page {idx} size mismatch" ); let next_first_row_index = column_index + .page_locations .get(idx + 1) .map(|x| x.first_row_index) .unwrap_or_else(|| row_group.num_rows()); From 1507c467681b180f701a95e085a3b83f453942a1 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 22 Jul 2024 16:16:49 -0700 Subject: [PATCH 2/2] add to_thrift() to OffsetIndexMetaData --- parquet/src/file/page_index/offset_index.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs index 9138620e3fcd..2ae3464141ca 100644 --- a/parquet/src/file/page_index/offset_index.rs +++ b/parquet/src/file/page_index/offset_index.rs @@ -47,4 +47,13 @@ impl OffsetIndexMetaData { pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec> { self.unencoded_byte_array_data_bytes.as_ref() } + + // TODO: remove annotation after merge + #[allow(dead_code)] + pub(crate) fn to_thrift(&self) -> OffsetIndex { + OffsetIndex::new( + self.page_locations.clone(), + self.unencoded_byte_array_data_bytes.clone(), + ) + } }