Skip to content

Commit

Permalink
Cleanup ChunkReader (#4118) (#4156)
Browse files Browse the repository at this point in the history
* Remove length from ChunkReader (#4118)

* Remove ChunkReader::T Send bound

* Remove FileSource

* Tweak docs
  • Loading branch information
tustvold authored May 2, 2023
1 parent 08dc16c commit eb5ac69
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 334 deletions.
39 changes: 22 additions & 17 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,25 @@ enum ColumnChunkData {
Dense { offset: usize, data: Bytes },
}

impl ColumnChunkData {
fn get(&self, start: u64) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.clone())
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
Ok(data.slice(start..))
}
}
}
}

impl Length for ColumnChunkData {
fn len(&self) -> u64 {
match &self {
Expand All @@ -756,26 +775,12 @@ impl Length for ColumnChunkData {
impl ChunkReader for ColumnChunkData {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
fn get_read(&self, start: u64) -> Result<Self::T> {
Ok(self.get(start)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
match &self {
ColumnChunkData::Sparse { data, .. } => data
.binary_search_by_key(&start, |(offset, _)| *offset as u64)
.map(|idx| data[idx].1.slice(0..length))
.map_err(|_| {
ParquetError::General(format!(
"Invalid offset in sparse column chunk data: {start}"
))
}),
ColumnChunkData::Dense { offset, data } => {
let start = start as usize - *offset;
let end = start + length;
Ok(data.slice(start..end))
}
}
Ok(self.get(start)?.slice(..length))
}
}

Expand Down
3 changes: 1 addition & 2 deletions parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ fn read_page_header<C: ChunkReader>(
}
}

let len = reader.len().checked_sub(offset).unwrap() as usize;
let input = reader.get_read(offset, len)?;
let input = reader.get_read(offset)?;
let mut tracked = TrackedRead(input, 0);
let mut prot = TCompactInputProtocol::new(&mut tracked);
let header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/footer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat

let mut footer = [0_u8; 8];
chunk_reader
.get_read(file_size - 8, 8)?
.get_read(file_size - 8)?
.read_exact(&mut footer)?;

let metadata_len = decode_footer(&footer)?;
Expand Down
68 changes: 59 additions & 9 deletions parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
//! readers to read individual column chunks, or access record
//! iterator.
use bytes::Bytes;
use bytes::{Buf, Bytes};
use std::fs::File;
use std::io::{BufReader, Seek, SeekFrom};
use std::{boxed::Box, io::Read, sync::Arc};

use crate::bloom_filter::Sbbf;
Expand All @@ -44,19 +46,47 @@ pub trait Length {
}

/// The ChunkReader trait generates readers of chunks of a source.
/// For a file system reader, each chunk might contain a clone of File bounded on a given range.
/// For an object store reader, each read can be mapped to a range request.
///
/// For more information see [`File::try_clone`]
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
/// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
type T: Read;

/// Get a [`Read`] starting at the provided file offset
///
/// Subsequent or concurrent calls to [`Self::get_read`] or [`Self::get_bytes`] may
/// side-effect on previously returned [`Self::T`]. Care should be taken to avoid this
///
/// See [`File::try_clone`] for more information
fn get_read(&self, start: u64) -> Result<Self::T>;

/// Get a range as bytes
/// This should fail if the exact number of bytes cannot be read
///
/// Concurrent calls to [`Self::get_bytes`] may result in interleaved output
///
/// See [`File::try_clone`] for more information
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
}

impl Length for File {
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
}

impl ChunkReader for File {
type T = BufReader<File>;

fn get_read(&self, start: u64) -> Result<Self::T> {
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
Ok(BufReader::new(self.try_clone()?))
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let mut buffer = Vec::with_capacity(length);
let read = self.get_read(start, length)?.read_to_end(&mut buffer)?;
let mut reader = self.try_clone()?;
reader.seek(SeekFrom::Start(start))?;
let read = reader.take(length as _).read_to_end(&mut buffer)?;

if read != length {
return Err(eof_err!(
Expand All @@ -69,6 +99,26 @@ pub trait ChunkReader: Length + Send + Sync {
}
}

impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}

impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64) -> Result<Self::T> {
let start = start as usize;
Ok(self.slice(start..).reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
Ok(self.slice(start..start + length))
}
}

// ----------------------------------------------------------------------
// APIs for file & row group readers

Expand Down
63 changes: 5 additions & 58 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,60 +40,8 @@ use crate::format::{PageHeader, PageLocation, PageType};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
use crate::util::{io::TryClone, memory::ByteBufferPtr};
use bytes::{Buf, Bytes};
use crate::util::memory::ByteBufferPtr;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
// export `SliceableCursor` and `FileSource` publicly so clients can
// re-use the logic in their own ParquetFileWriter wrappers
pub use crate::util::io::FileSource;

// ----------------------------------------------------------------------
// Implementations of traits facilitating the creation of a new reader

impl Length for File {
fn len(&self) -> u64 {
self.metadata().map(|m| m.len()).unwrap_or(0u64)
}
}

impl TryClone for File {
fn try_clone(&self) -> std::io::Result<Self> {
self.try_clone()
}
}

impl ChunkReader for File {
type T = FileSource<File>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(FileSource::new(self, start, length))
}
}

impl Length for Bytes {
fn len(&self) -> u64 {
self.len() as u64
}
}

impl TryClone for Bytes {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(self.clone())
}
}

impl ChunkReader for Bytes {
type T = bytes::buf::Reader<Bytes>;

fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
Ok(self.get_bytes(start, length)?.reader())
}

fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
let start = start as usize;
Ok(self.slice(start..start + length))
}
}

impl TryFrom<File> for SerializedFileReader<File> {
type Error = ParquetError;
Expand Down Expand Up @@ -662,7 +610,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;
let mut read = self.reader.get_read(*offset as u64)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
Expand Down Expand Up @@ -752,8 +700,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
Expand Down Expand Up @@ -807,8 +754,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
*offset += buffered_header.compressed_page_size as usize;
*remaining_bytes -= buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let mut read = self.reader.get_read(*offset as u64)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
Expand All @@ -827,6 +773,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use bytes::Bytes;
use std::sync::Arc;

use crate::format::BoundaryOrder;
Expand Down
Loading

0 comments on commit eb5ac69

Please sign in to comment.