Skip to content

Commit

Permalink
refactor(rust): reuse message parsing in IPC (pola-rs#15265)
Browse files Browse the repository at this point in the history
  • Loading branch information
c-peters authored Mar 24, 2024
1 parent 2484dd2 commit 252702a
Showing 1 changed file with 6 additions and 34 deletions.
40 changes: 6 additions & 34 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,6 @@ pub struct FileMetadata {
pub size: u64,
}

fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
) -> PolarsResult<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let message_length = i32::from_le_bytes(message_size);

let message_length: usize = message_length
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;

data.clear();
data.try_reserve(message_length)?;
reader
.by_ref()
.take(message_length as u64)
.read_to_end(data)?;

Ok(())
}
/// Read the row count by summing the length of the of the record batches
pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
let mut message_scratch: Vec<u8> = Default::default();
Expand All @@ -72,7 +46,7 @@ pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
blocks
.into_iter()
.map(|block| {
let message = get_message_from_block(reader, block, &mut message_scratch)?;
let message = get_message_from_block(reader, &block, &mut message_scratch)?;
let record_batch = get_record_batch(message)?;
record_batch.length().map_err(|e| e.into())
})
Expand Down Expand Up @@ -100,20 +74,18 @@ fn read_dictionary_block<R: Read + Seek>(
message_scratch: &mut Vec<u8>,
dictionary_scratch: &mut Vec<u8>,
) -> PolarsResult<()> {
let message = get_message_from_block(reader, block, message_scratch)?;
let batch = get_dictionary_batch(&message)?;

let offset: u64 = block
.offset
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;

let length: u64 = block
.meta_data_length
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
read_dictionary_message(reader, offset, message_scratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref())
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_dictionary_batch(&message)?;

read_dictionary(
batch,
Expand Down Expand Up @@ -299,7 +271,7 @@ fn get_message_from_block_offset<'a, R: Read + Seek>(

fn get_message_from_block<'a, R: Read + Seek>(
reader: &mut R,
block: arrow_format::ipc::Block,
block: &arrow_format::ipc::Block,
message_scratch: &'a mut Vec<u8>,
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
let offset: u64 = block
Expand Down

0 comments on commit 252702a

Please sign in to comment.