From 6cd101c10fafbf9cd702b0a2bb0738e9c7db6089 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Tue, 14 Dec 2021 19:41:18 +0000 Subject: [PATCH] Extract method to drive PageIterator -> RecordReader (#1031) Co-authored-by: Andrew Lamb --- parquet/src/arrow/array_reader.rs | 88 +++++++++++++------------------ 1 file changed, 37 insertions(+), 51 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index ae001ed73391..c3abda05a5bd 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -100,6 +100,36 @@ pub trait ArrayReader { fn get_rep_levels(&self) -> Option<&[i16]>; } +/// Uses `record_reader` to read up to `batch_size` records from `pages` +/// +/// Returns the number of records read, which can be less than batch_size if +/// pages is exhausted. +fn read_records( + record_reader: &mut RecordReader, + pages: &mut dyn PageIterator, + batch_size: usize, +) -> Result { + let mut records_read = 0usize; + while records_read < batch_size { + let records_to_read = batch_size - records_read; + + let records_read_once = record_reader.read_records(records_to_read)?; + records_read += records_read_once; + + // Record reader exhausted + if records_read_once < records_to_read { + if let Some(page_reader) = pages.next() { + // Read from new page reader (i.e. column chunk) + record_reader.set_page_reader(page_reader?)?; + } else { + // Page reader also exhausted + break; + } + } + } + Ok(records_read) +} + /// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow /// NullArray type. pub struct NullArrayReader { @@ -114,14 +144,8 @@ pub struct NullArrayReader { impl NullArrayReader { /// Construct null array reader. - pub fn new( - mut pages: Box, - column_desc: ColumnDescPtr, - ) -> Result { - let mut record_reader = RecordReader::::new(column_desc.clone()); - if let Some(page_reader) = pages.next() { - record_reader.set_page_reader(page_reader?)?; - } + pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { + let record_reader = RecordReader::::new(column_desc.clone()); Ok(Self { data_type: ArrowType::Null, @@ -148,25 +172,8 @@ impl ArrayReader for NullArrayReader { /// Reads at most `batch_size` records into array. fn next_batch(&mut self, batch_size: usize) -> Result { - let mut records_read = 0usize; - while records_read < batch_size { - let records_to_read = batch_size - records_read; - - // NB can be 0 if at end of page - let records_read_once = self.record_reader.read_records(records_to_read)?; - records_read += records_read_once; - - // Record reader exhausted - if records_read_once < records_to_read { - if let Some(page_reader) = self.pages.next() { - // Read from new page reader - self.record_reader.set_page_reader(page_reader?)?; - } else { - // Page reader also exhausted - break; - } - } - } + let records_read = + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; // convert to arrays let array = arrow::array::NullArray::new(records_read); @@ -206,7 +213,7 @@ pub struct PrimitiveArrayReader { impl PrimitiveArrayReader { /// Construct primitive array reader. pub fn new( - mut pages: Box, + pages: Box, column_desc: ColumnDescPtr, arrow_type: Option, ) -> Result { @@ -218,10 +225,7 @@ impl PrimitiveArrayReader { .clone(), }; - let mut record_reader = RecordReader::::new(column_desc.clone()); - if let Some(page_reader) = pages.next() { - record_reader.set_page_reader(page_reader?)?; - } + let record_reader = RecordReader::::new(column_desc.clone()); Ok(Self { data_type, @@ -248,25 +252,7 @@ impl ArrayReader for PrimitiveArrayReader { /// Reads at most `batch_size` records into array. fn next_batch(&mut self, batch_size: usize) -> Result { - let mut records_read = 0usize; - while records_read < batch_size { - let records_to_read = batch_size - records_read; - - // NB can be 0 if at end of page - let records_read_once = self.record_reader.read_records(records_to_read)?; - records_read += records_read_once; - - // Record reader exhausted - if records_read_once < records_to_read { - if let Some(page_reader) = self.pages.next() { - // Read from new page reader - self.record_reader.set_page_reader(page_reader?)?; - } else { - // Page reader also exhausted - break; - } - } - } + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; let target_type = self.get_data_type().clone(); let arrow_data_type = match T::get_physical_type() {