From d8fb46ef53ceb6a98b100208e889b85975d7db67 Mon Sep 17 00:00:00 2001 From: Yibo Yan Date: Sat, 4 Jan 2025 08:25:41 +0000 Subject: [PATCH 1/3] add: APIs that skip array data validation --- arrow-ipc/src/reader.rs | 211 ++++++++++++++++++++++++++++++--- arrow-ipc/src/reader/stream.rs | 17 +++ 2 files changed, 211 insertions(+), 17 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 9ff4da30ed8c..99abff34f47e 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -79,6 +79,7 @@ fn create_array( field: &Field, variadic_counts: &mut VecDeque, require_alignment: bool, + skip_validations: bool, ) -> Result { let data_type = field.data_type(); match data_type { @@ -91,6 +92,7 @@ fn create_array( reader.next_buffer()?, ], require_alignment, + skip_validations, ), BinaryView | Utf8View => { let count = variadic_counts @@ -107,6 +109,7 @@ fn create_array( data_type, &buffers, require_alignment, + skip_validations, ) } FixedSizeBinary(_) => create_primitive_array( @@ -114,29 +117,44 @@ fn create_array( data_type, &[reader.next_buffer()?, reader.next_buffer()?], require_alignment, + skip_validations, ), List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = create_array(reader, list_field, variadic_counts, require_alignment)?; + let values = create_array( + reader, + list_field, + variadic_counts, + require_alignment, + skip_validations, + )?; create_list_array( list_node, data_type, &list_buffers, values, require_alignment, + skip_validations, ) } FixedSizeList(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?]; - let values = create_array(reader, list_field, variadic_counts, require_alignment)?; + let values = create_array( + reader, + list_field, + variadic_counts, + require_alignment, + skip_validations, + )?; create_list_array( list_node, data_type, &list_buffers, values, require_alignment, + skip_validations, ) } Struct(struct_fields) => { @@ -148,7 +166,13 @@ fn create_array( // TODO investigate whether just knowing the number of buffers could // still work for struct_field in struct_fields { - let child = create_array(reader, struct_field, variadic_counts, require_alignment)?; + let child = create_array( + reader, + struct_field, + variadic_counts, + require_alignment, + skip_validations, + )?; struct_arrays.push(child); } let null_count = struct_node.null_count() as usize; @@ -172,9 +196,20 @@ fn create_array( } RunEndEncoded(run_ends_field, values_field) => { let run_node = reader.next_node(field)?; - let run_ends = - create_array(reader, run_ends_field, variadic_counts, require_alignment)?; - let values = create_array(reader, values_field, variadic_counts, require_alignment)?; + let run_ends = create_array( + reader, + run_ends_field, + variadic_counts, + require_alignment, + skip_validations, + )?; + let values = create_array( + reader, + values_field, + variadic_counts, + require_alignment, + skip_validations, + )?; let run_array_length = run_node.length() as usize; let builder = ArrayData::builder(data_type.clone()) @@ -183,7 +218,9 @@ fn create_array( .add_child_data(run_ends.into_data()) .add_child_data(values.into_data()); - let array_data = if require_alignment { + let array_data = if skip_validations { + unsafe { builder.build_unchecked() } + } else if require_alignment { builder.build()? } else { builder.build_aligned()? @@ -213,6 +250,7 @@ fn create_array( &index_buffers, value_array.clone(), require_alignment, + skip_validations, ) } Union(fields, mode) => { @@ -239,7 +277,13 @@ fn create_array( let mut children = Vec::with_capacity(fields.len()); for (_id, field) in fields.iter() { - let child = create_array(reader, field, variadic_counts, require_alignment)?; + let child = create_array( + reader, + field, + variadic_counts, + require_alignment, + skip_validations, + )?; children.push(child); } @@ -261,7 +305,9 @@ fn create_array( .len(length as usize) .offset(0); - let array_data = if require_alignment { + let array_data = if skip_validations { + unsafe { builder.build_unchecked() } + } else if require_alignment { builder.build()? } else { builder.build_aligned()? @@ -275,17 +321,36 @@ fn create_array( data_type, &[reader.next_buffer()?, reader.next_buffer()?], require_alignment, + skip_validations, ), } } /// Reads the correct number of buffers based on data type and null_count, and creates a /// primitive array ref +/// +/// # Arguments +/// +/// * `field_node` - A reference to the `FieldNode` which contains the length and null count of the array. +/// * `data_type` - The `DataType` of the array to be created. +/// * `buffers` - A slice of `Buffer` which contains the data for the array. +/// * `require_alignment` - A boolean indicating whether the buffers need to be aligned. +/// * `skip_validations` - A boolean indicating whether to skip validations. +/// +/// # Safety +/// +/// `skip_validations` allows the creation of an `ArrayData` without performing the +/// usual validations. This can lead to undefined behavior if the data is not +/// correctly formatted. Set `skip_validations` to true only if you are certain +/// +/// # Notes +/// If `skip_validations` is true, `require_alignment` is ignored. fn create_primitive_array( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], require_alignment: bool, + skip_validations: bool, ) -> Result { let length = field_node.length() as usize; let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); @@ -311,7 +376,9 @@ fn create_primitive_array( t => unreachable!("Data type {:?} either unsupported or not primitive", t), }; - let array_data = if require_alignment { + let array_data = if skip_validations { + unsafe { builder.build_unchecked() } + } else if require_alignment { builder.build()? } else { builder.build_aligned()? @@ -322,12 +389,21 @@ fn create_primitive_array( /// Reads the correct number of buffers based on list type and null_count, and creates a /// list array ref +/// +/// Safety: +/// `skip_validations` allows the creation of an `ArrayData` without performing the +/// usual validations. This can lead to undefined behavior if the data is not +/// correctly formatted. Set `skip_validations` to true only if you are certain. +/// +/// Notes: +/// * If `skip_validations` is true, `require_alignment` is ignored. fn create_list_array( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], child_array: ArrayRef, require_alignment: bool, + skip_validations: bool, ) -> Result { let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); let length = field_node.length() as usize; @@ -347,7 +423,9 @@ fn create_list_array( _ => unreachable!("Cannot create list or map array from {:?}", data_type), }; - let array_data = if require_alignment { + let array_data = if skip_validations { + unsafe { builder.build_unchecked() } + } else if require_alignment { builder.build()? } else { builder.build_aligned()? @@ -358,12 +436,21 @@ fn create_list_array( /// Reads the correct number of buffers based on list type and null_count, and creates a /// list array ref +/// +/// Safety: +/// `skip_validations` allows the creation of an `ArrayData` without performing the +/// usual validations. This can lead to undefined behavior if the data is not +/// correctly formatted. Set `skip_validations` to true only if you are certain. +/// +/// Notes: +/// * If `skip_validations` is true, `require_alignment` is ignored. fn create_dictionary_array( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], value_array: ArrayRef, require_alignment: bool, + skip_validations: bool, ) -> Result { if let Dictionary(_, _) = *data_type { let null_buffer = (field_node.null_count() > 0).then_some(buffers[0].clone()); @@ -373,7 +460,9 @@ fn create_dictionary_array( .add_child_data(value_array.into_data()) .null_bit_buffer(null_buffer); - let array_data = if require_alignment { + let array_data = if skip_validations { + unsafe { builder.build_unchecked() } + } else if require_alignment { builder.build()? } else { builder.build_aligned()? @@ -521,6 +610,7 @@ pub fn read_record_batch( projection, metadata, false, + false, ) } @@ -533,7 +623,15 @@ pub fn read_dictionary( dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, ) -> Result<(), ArrowError> { - read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, false) + read_dictionary_impl( + buf, + batch, + schema, + dictionaries_by_id, + metadata, + false, + false, + ) } fn read_record_batch_impl( @@ -544,6 +642,7 @@ fn read_record_batch_impl( projection: Option<&[usize]>, metadata: &MetadataVersion, require_alignment: bool, + skip_validations: bool, ) -> Result { let buffers = batch.buffers().ok_or_else(|| { ArrowError::IpcError("Unable to get buffers from IPC RecordBatch".to_string()) @@ -577,8 +676,13 @@ fn read_record_batch_impl( for (idx, field) in schema.fields().iter().enumerate() { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = - create_array(&mut reader, field, &mut variadic_counts, require_alignment)?; + let child = create_array( + &mut reader, + field, + &mut variadic_counts, + require_alignment, + skip_validations, + )?; arrays.push((proj_idx, child)); } else { reader.skip_field(field, &mut variadic_counts)?; @@ -595,7 +699,13 @@ fn read_record_batch_impl( let mut children = vec![]; // keep track of index as lists require more than one node for field in schema.fields() { - let child = create_array(&mut reader, field, &mut variadic_counts, require_alignment)?; + let child = create_array( + &mut reader, + field, + &mut variadic_counts, + require_alignment, + skip_validations, + )?; children.push(child); } assert!(variadic_counts.is_empty()); @@ -610,6 +720,7 @@ fn read_dictionary_impl( dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, require_alignment: bool, + skip_validations: bool, ) -> Result<(), ArrowError> { if batch.isDelta() { return Err(ArrowError::InvalidArgumentError( @@ -641,6 +752,7 @@ fn read_dictionary_impl( None, metadata, require_alignment, + skip_validations, )?; Some(record_batch.column(0).clone()) } @@ -766,6 +878,7 @@ pub struct FileDecoder { version: MetadataVersion, projection: Option>, require_alignment: bool, + skip_validations: bool, } impl FileDecoder { @@ -777,6 +890,7 @@ impl FileDecoder { dictionaries: Default::default(), projection: None, require_alignment: false, + skip_validations: false, } } @@ -803,6 +917,19 @@ impl FileDecoder { self } + /// Specifies whether or not to skip validations when creating [`ArrayData`]. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// Set `skip_validations` to true only if you are certain. + /// + /// Notes: + /// * If `skip_validations` is true, `require_alignment` is ignored. + /// * If `skip_validations` is true, it uses [`arrow_data::ArrayDataBuilder::build_unchecked`] to + /// construct [`arrow_data::ArrayData`] under the hood. + pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { + self.skip_validations = skip_validations; + self + } + fn read_message<'a>(&self, buf: &'a [u8]) -> Result, ArrowError> { let message = parse_message(buf)?; @@ -828,6 +955,7 @@ impl FileDecoder { &mut self.dictionaries, &message.version(), self.require_alignment, + self.skip_validations, ) } t => Err(ArrowError::ParseError(format!( @@ -860,6 +988,7 @@ impl FileDecoder { self.projection.as_deref(), &message.version(), self.require_alignment, + self.skip_validations, ) .map(Some) } @@ -880,6 +1009,8 @@ pub struct FileReaderBuilder { max_footer_fb_tables: usize, /// Passed through to construct [`VerifierOptions`] max_footer_fb_depth: usize, + /// Skip validations when creating [`ArrayData`] + skip_validations: bool, } impl Default for FileReaderBuilder { @@ -889,6 +1020,7 @@ impl Default for FileReaderBuilder { max_footer_fb_tables: verifier_options.max_tables, max_footer_fb_depth: verifier_options.max_depth, projection: None, + skip_validations: false, } } } @@ -907,6 +1039,14 @@ impl FileReaderBuilder { self } + /// Skip validations when creating underlying [`ArrayData`]. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// Set `skip_validations` to true only if you are certain. + pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { + self.skip_validations = skip_validations; + self + } + /// Flatbuffers option for parsing the footer. Controls the max number of fields and /// metadata key-value pairs that can be parsed from the schema of the footer. /// @@ -989,7 +1129,8 @@ impl FileReaderBuilder { } } - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); + let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()) + .with_skip_validations(self.skip_validations); if let Some(projection) = self.projection { decoder = decoder.with_projection(projection) } @@ -1075,6 +1216,23 @@ impl FileReader { builder.build(reader) } + /// Try to create a new file reader without validations. + /// + /// This is useful when the file is known to be valid and the user wants to skip validations. + /// This might be useful when the content is trusted and the user wants to avoid the overhead of + /// validating the content. + pub fn try_new_unvalidated( + reader: R, + projection: Option>, + ) -> Result { + let builder = FileReaderBuilder { + projection, + skip_validations: true, + ..Default::default() + }; + builder.build(reader) + } + /// Return user defined customized metadata pub fn custom_metadata(&self) -> &HashMap { &self.custom_metadata @@ -1168,6 +1326,10 @@ pub struct StreamReader { /// Optional projection projection: Option<(Vec, Schema)>, + + /// Specifies whether or not skip validations when creating underlying [`ArrayData`]. + /// This can lead to undefined behavior if the data is not correctly formatted. + skip_validations: bool, } impl fmt::Debug for StreamReader { @@ -1247,6 +1409,7 @@ impl StreamReader { finished: false, dictionaries_by_id, projection, + skip_validations: false, }) } @@ -1269,6 +1432,16 @@ impl StreamReader { self.finished } + /// Specifies whether or not skip validations when creating underlying [`ArrayData`]. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// + /// Notes: + /// * If `skip_validations` is true, `require_alignment` is ignored. + pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { + self.skip_validations = skip_validations; + self + } + fn maybe_next(&mut self) -> Result, ArrowError> { if self.finished { return Ok(None); @@ -1334,6 +1507,7 @@ impl StreamReader { self.projection.as_ref().map(|x| x.0.as_ref()), &message.version(), false, + self.skip_validations, ) .map(Some) } @@ -1354,6 +1528,7 @@ impl StreamReader { &mut self.dictionaries_by_id, &message.version(), false, + self.skip_validations, )?; // read the next message until we encounter a RecordBatch @@ -2184,6 +2359,7 @@ mod tests { None, &message.version(), false, + false, ) .unwrap(); assert_eq!(batch, roundtrip); @@ -2222,6 +2398,7 @@ mod tests { None, &message.version(), true, + false, ); let error = result.unwrap_err(); diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 9b0eea9b6198..127d346cf8c5 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -42,6 +42,8 @@ pub struct StreamDecoder { buf: MutableBuffer, /// Whether or not array data in input buffers are required to be aligned require_alignment: bool, + /// Whether or not to skip validation for underlying array creations + skip_validations: bool, } #[derive(Debug)] @@ -102,6 +104,19 @@ impl StreamDecoder { self } + /// Specifies whether or not to skip validations when creating [`ArrayData`]. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// Set `skip_validations` to true only if you are certain. + /// + /// Notes: + /// * If `skip_validations` is true, `require_alignment` is ignored. + /// * If `skip_validations` is true, it uses [`arrow_data::ArrayDataBuilder::build_unchecked`] to + /// construct [`arrow_data::ArrayData`] under the hood. + pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { + self.skip_validations = skip_validations; + self + } + /// Try to read the next [`RecordBatch`] from the provided [`Buffer`] /// /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes. @@ -219,6 +234,7 @@ impl StreamDecoder { None, &version, self.require_alignment, + self.skip_validations, )?; self.state = DecoderState::default(); return Ok(Some(batch)); @@ -235,6 +251,7 @@ impl StreamDecoder { &mut self.dictionaries, &version, self.require_alignment, + self.skip_validations, )?; self.state = DecoderState::default(); } From d2a5087b8ae26e585fd24dd958210928789c45a8 Mon Sep 17 00:00:00 2001 From: Yibo Yan Date: Sun, 5 Jan 2025 03:38:21 +0000 Subject: [PATCH 2/3] mod: separate unsafe and safe APIs for unvalidated array ops --- arrow-data/src/data.rs | 15 + arrow-ipc/src/reader.rs | 744 ++++++++++++++++++++++----------- arrow-ipc/src/reader/stream.rs | 131 +++--- 3 files changed, 592 insertions(+), 298 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index a35b5e8629e9..4b0422cb8ff0 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -1890,6 +1890,21 @@ impl ArrayDataBuilder { data } + /// Creates an array data, without any validation, + /// but aligning buffers. + /// + /// # Safety + /// + /// The same caveats as [`ArrayData::new_unchecked`] apply. + pub unsafe fn build_aligned_unchecked(self) -> ArrayData { + let mut data = self.build_impl(); + data.align_buffers(); + // Provide a force_validate mode + #[cfg(feature = "force_validate")] + data.validate_data().unwrap(); + data + } + /// Same as [`Self::build_unchecked`] but ignoring `force_validate` feature flag unsafe fn build_impl(self) -> ArrayData { let nulls = self diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 99abff34f47e..689ed798ee9a 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -32,13 +32,40 @@ use std::sync::Arc; use arrow_array::*; use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, ScalarBuffer}; -use arrow_data::ArrayData; +use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::*; use crate::compression::CompressionCodec; use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; use DataType::*; +/// Build an array from the builder, optionally skipping validations. +/// +/// # Safety +/// If `skip_validations` is true, the function will build an `ArrayData` without performing the +/// usual validations. This can lead to undefined behavior if the data is not correctly formatted. +/// +/// Set `skip_validations` to true only if you are certain. +unsafe fn build_array_internal( + builder: ArrayDataBuilder, + require_alignment: bool, + skip_validations: bool, +) -> Result { + if skip_validations { + if require_alignment { + Ok(builder.build_unchecked()) + } else { + Ok(builder.build_aligned_unchecked()) + } + } else { + if require_alignment { + builder.build() + } else { + builder.build_aligned() + } + } +} + /// Read a buffer based on offset and length /// From /// Each constituent buffer is first compressed with the indicated @@ -62,19 +89,7 @@ fn read_buffer( } } -/// Coordinates reading arrays based on data types. -/// -/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView) -/// When encounter such types, we pop from the front of the queue to get the number of buffers to read. -/// -/// Notes: -/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls -/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. -/// We thus: -/// - check if the bit width of non-64-bit numbers is 64, and -/// - read the buffer as 64-bit (signed integer or float), and -/// - cast the 64-bit array to the appropriate data type -fn create_array( +unsafe fn create_array_internal( reader: &mut ArrayReader, field: &Field, variadic_counts: &mut VecDeque, @@ -82,8 +97,75 @@ fn create_array( skip_validations: bool, ) -> Result { let data_type = field.data_type(); + + // Helper functions to create arrays (with or without validations) + let create_primitive_array_helper = + |field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer]| { + if skip_validations { + create_primitive_array_unchecked(field_node, data_type, buffers, require_alignment) + } else { + create_primitive_array(field_node, data_type, buffers, require_alignment) + } + }; + + let create_array_helper = + |reader: &mut ArrayReader, field: &Field, variadic_counts: &mut VecDeque| { + if skip_validations { + create_array_unchecked(reader, field, variadic_counts, require_alignment) + } else { + create_array(reader, field, variadic_counts, require_alignment) + } + }; + + let create_list_array_helper = |field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef| { + if skip_validations { + create_list_array_unchecked( + field_node, + data_type, + buffers, + child_array, + require_alignment, + ) + } else { + create_list_array( + field_node, + data_type, + buffers, + child_array, + require_alignment, + ) + } + }; + + let create_dictionary_array_helper = + |field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + value_array: ArrayRef| { + if skip_validations { + create_dictionary_array_unchecked( + field_node, + data_type, + buffers, + value_array, + require_alignment, + ) + } else { + create_dictionary_array( + field_node, + data_type, + buffers, + value_array, + require_alignment, + ) + } + }; + match data_type { - Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( + Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array_helper( reader.next_node(field)?, data_type, &[ @@ -91,8 +173,6 @@ fn create_array( reader.next_buffer()?, reader.next_buffer()?, ], - require_alignment, - skip_validations, ), BinaryView | Utf8View => { let count = variadic_counts @@ -104,58 +184,24 @@ fn create_array( let buffers = (0..count) .map(|_| reader.next_buffer()) .collect::, _>>()?; - create_primitive_array( - reader.next_node(field)?, - data_type, - &buffers, - require_alignment, - skip_validations, - ) + create_primitive_array_helper(reader.next_node(field)?, data_type, &buffers) } - FixedSizeBinary(_) => create_primitive_array( + FixedSizeBinary(_) => create_primitive_array_helper( reader.next_node(field)?, data_type, &[reader.next_buffer()?, reader.next_buffer()?], - require_alignment, - skip_validations, ), List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = create_array( - reader, - list_field, - variadic_counts, - require_alignment, - skip_validations, - )?; - create_list_array( - list_node, - data_type, - &list_buffers, - values, - require_alignment, - skip_validations, - ) + let values = create_array_helper(reader, list_field, variadic_counts)?; + create_list_array_helper(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?]; - let values = create_array( - reader, - list_field, - variadic_counts, - require_alignment, - skip_validations, - )?; - create_list_array( - list_node, - data_type, - &list_buffers, - values, - require_alignment, - skip_validations, - ) + let values = create_array_helper(reader, list_field, variadic_counts)?; + create_list_array_helper(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { let struct_node = reader.next_node(field)?; @@ -166,13 +212,7 @@ fn create_array( // TODO investigate whether just knowing the number of buffers could // still work for struct_field in struct_fields { - let child = create_array( - reader, - struct_field, - variadic_counts, - require_alignment, - skip_validations, - )?; + let child = create_array_helper(reader, struct_field, variadic_counts)?; struct_arrays.push(child); } let null_count = struct_node.null_count() as usize; @@ -196,20 +236,8 @@ fn create_array( } RunEndEncoded(run_ends_field, values_field) => { let run_node = reader.next_node(field)?; - let run_ends = create_array( - reader, - run_ends_field, - variadic_counts, - require_alignment, - skip_validations, - )?; - let values = create_array( - reader, - values_field, - variadic_counts, - require_alignment, - skip_validations, - )?; + let run_ends = create_array_helper(reader, run_ends_field, variadic_counts)?; + let values = create_array_helper(reader, values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let builder = ArrayData::builder(data_type.clone()) @@ -244,13 +272,11 @@ fn create_array( )) })?; - create_dictionary_array( + create_dictionary_array_helper( index_node, data_type, &index_buffers, value_array.clone(), - require_alignment, - skip_validations, ) } Union(fields, mode) => { @@ -277,13 +303,7 @@ fn create_array( let mut children = Vec::with_capacity(fields.len()); for (_id, field) in fields.iter() { - let child = create_array( - reader, - field, - variadic_counts, - require_alignment, - skip_validations, - )?; + let child = create_array_helper(reader, field, variadic_counts)?; children.push(child); } @@ -316,36 +336,53 @@ fn create_array( // no buffer increases Ok(Arc::new(NullArray::from(array_data))) } - _ => create_primitive_array( + _ => create_primitive_array_helper( reader.next_node(field)?, data_type, &[reader.next_buffer()?, reader.next_buffer()?], - require_alignment, - skip_validations, ), } } -/// Reads the correct number of buffers based on data type and null_count, and creates a -/// primitive array ref -/// -/// # Arguments -/// -/// * `field_node` - A reference to the `FieldNode` which contains the length and null count of the array. -/// * `data_type` - The `DataType` of the array to be created. -/// * `buffers` - A slice of `Buffer` which contains the data for the array. -/// * `require_alignment` - A boolean indicating whether the buffers need to be aligned. -/// * `skip_validations` - A boolean indicating whether to skip validations. +/// Semantic is the same as `create_array`, but the function is unsafe +/// as it skips validations. /// /// # Safety +/// Skip validations to create an `ArrayData` without performing the usual validations. +/// This can lead to undefined behavior if the data is not correctly formatted. +unsafe fn create_array_unchecked( + reader: &mut ArrayReader, + field: &Field, + variadic_counts: &mut VecDeque, + require_alignment: bool, +) -> Result { + create_array_internal(reader, field, variadic_counts, require_alignment, true) +} + +/// Coordinates reading arrays based on data types. /// -/// `skip_validations` allows the creation of an `ArrayData` without performing the -/// usual validations. This can lead to undefined behavior if the data is not -/// correctly formatted. Set `skip_validations` to true only if you are certain +/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView) +/// When encounter such types, we pop from the front of the queue to get the number of buffers to read. /// -/// # Notes -/// If `skip_validations` is true, `require_alignment` is ignored. -fn create_primitive_array( +/// Notes: +/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls +/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. +/// We thus: +/// - check if the bit width of non-64-bit numbers is 64, and +/// - read the buffer as 64-bit (signed integer or float), and +/// - cast the 64-bit array to the appropriate data type +fn create_array( + reader: &mut ArrayReader, + field: &Field, + variadic_counts: &mut VecDeque, + require_alignment: bool, +) -> Result { + unsafe { create_array_internal(reader, field, variadic_counts, require_alignment, false) } +} + +/// `skip_validations` allows the creation of an `ArrayData` without performing the usual +/// validations. This can lead to undefined behavior if the data is not correctly formatted. +unsafe fn create_primitive_array_internal( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], @@ -376,28 +413,47 @@ fn create_primitive_array( t => unreachable!("Data type {:?} either unsupported or not primitive", t), }; - let array_data = if skip_validations { - unsafe { builder.build_unchecked() } - } else if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + let array_data = build_array_internal(builder, require_alignment, skip_validations)?; Ok(make_array(array_data)) } +/// Reads the correct number of buffers based on data type and null_count, and creates a +/// primitive array ref +/// +/// # Safety +/// Skip validations to create an `ArrayData` without performing the usual validations. +/// This can lead to undefined behavior if the data is not correctly formatted. +unsafe fn create_primitive_array_unchecked( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + require_alignment: bool, +) -> Result { + create_primitive_array_internal(field_node, data_type, buffers, require_alignment, true) +} + +/// Reads the correct number of buffers based on data type and null_count, and creates a +/// primitive array ref +fn create_primitive_array( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + require_alignment: bool, +) -> Result { + unsafe { + create_primitive_array_internal(field_node, data_type, buffers, require_alignment, false) + } +} + /// Reads the correct number of buffers based on list type and null_count, and creates a -/// list array ref +/// list array ref for internal usage. /// -/// Safety: +/// # Safety /// `skip_validations` allows the creation of an `ArrayData` without performing the /// usual validations. This can lead to undefined behavior if the data is not /// correctly formatted. Set `skip_validations` to true only if you are certain. -/// -/// Notes: -/// * If `skip_validations` is true, `require_alignment` is ignored. -fn create_list_array( +unsafe fn create_list_array_internal( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], @@ -423,28 +479,57 @@ fn create_list_array( _ => unreachable!("Cannot create list or map array from {:?}", data_type), }; - let array_data = if skip_validations { - unsafe { builder.build_unchecked() } - } else if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; + let array_data = build_array_internal(builder, require_alignment, skip_validations)?; Ok(make_array(array_data)) } +/// Reads the correct number of buffers based on list type and null_count, and creates a +/// list array ref +fn create_list_array( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef, + require_alignment: bool, +) -> Result { + unsafe { + create_list_array_internal( + field_node, + data_type, + buffers, + child_array, + require_alignment, + false, + ) + } +} + /// Reads the correct number of buffers based on list type and null_count, and creates a /// list array ref /// /// Safety: -/// `skip_validations` allows the creation of an `ArrayData` without performing the -/// usual validations. This can lead to undefined behavior if the data is not -/// correctly formatted. Set `skip_validations` to true only if you are certain. -/// -/// Notes: -/// * If `skip_validations` is true, `require_alignment` is ignored. -fn create_dictionary_array( +/// Internal data validation is skipped. This can lead to undefined behavior if the data +/// is not correctly formatted. +/// Use this function only if you are certain and trust the data source. +unsafe fn create_list_array_unchecked( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef, + require_alignment: bool, +) -> Result { + create_list_array_internal( + field_node, + data_type, + buffers, + child_array, + require_alignment, + true, + ) +} + +unsafe fn create_dictionary_array_internal( field_node: &FieldNode, data_type: &DataType, buffers: &[Buffer], @@ -460,20 +545,59 @@ fn create_dictionary_array( .add_child_data(value_array.into_data()) .null_bit_buffer(null_buffer); - let array_data = if skip_validations { - unsafe { builder.build_unchecked() } - } else if require_alignment { - builder.build()? - } else { - builder.build_aligned()? - }; - + let array_data = build_array_internal(builder, require_alignment, skip_validations)?; Ok(make_array(array_data)) } else { unreachable!("Cannot create dictionary array from {:?}", data_type) } } +/// Reads the correct number of buffers based on list type and null_count, and creates a +/// list array ref +/// +/// Safety: +/// `skip_validations` allows the creation of an `ArrayData` without performing the +/// usual validations. This can lead to undefined behavior if the data is not +/// correctly formatted. Set `skip_validations` to true only if you are certain. +/// +/// Notes: +/// * If `skip_validations` is true, `require_alignment` is ignored. +fn create_dictionary_array( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + value_array: ArrayRef, + require_alignment: bool, +) -> Result { + unsafe { + create_dictionary_array_internal( + field_node, + data_type, + buffers, + value_array, + require_alignment, + false, + ) + } +} + +unsafe fn create_dictionary_array_unchecked( + field_node: &FieldNode, + data_type: &DataType, + buffers: &[Buffer], + value_array: ArrayRef, + require_alignment: bool, +) -> Result { + create_dictionary_array_internal( + field_node, + data_type, + buffers, + value_array, + require_alignment, + true, + ) +} + /// State for decoding arrays from an encoded [`RecordBatch`] struct ArrayReader<'a> { /// Decoded dictionaries indexed by dictionary id @@ -601,6 +725,29 @@ pub fn read_record_batch( dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, +) -> Result { + unsafe { + read_record_batch_impl( + buf, + batch, + schema, + dictionaries_by_id, + projection, + metadata, + false, + false, + ) + } +} + +/// Same as `read_record_batch`, but skips validations. +pub unsafe fn read_record_batch_unchecked( + buf: &Buffer, + batch: crate::RecordBatch, + schema: SchemaRef, + dictionaries_by_id: &HashMap, + projection: Option<&[usize]>, + metadata: &MetadataVersion, ) -> Result { read_record_batch_impl( buf, @@ -610,7 +757,7 @@ pub fn read_record_batch( projection, metadata, false, - false, + true, ) } @@ -622,6 +769,27 @@ pub fn read_dictionary( schema: &Schema, dictionaries_by_id: &mut HashMap, metadata: &MetadataVersion, +) -> Result<(), ArrowError> { + unsafe { + read_dictionary_impl( + buf, + batch, + schema, + dictionaries_by_id, + metadata, + false, + false, + ) + } +} + +/// Same as `read_dictionary`, but skips validations. +pub unsafe fn read_dictionary_unchecked( + buf: &Buffer, + batch: crate::DictionaryBatch, + schema: &Schema, + dictionaries_by_id: &mut HashMap, + metadata: &MetadataVersion, ) -> Result<(), ArrowError> { read_dictionary_impl( buf, @@ -630,11 +798,11 @@ pub fn read_dictionary( dictionaries_by_id, metadata, false, - false, + true, ) } -fn read_record_batch_impl( +unsafe fn read_record_batch_impl( buf: &Buffer, batch: crate::RecordBatch, schema: SchemaRef, @@ -676,13 +844,16 @@ fn read_record_batch_impl( for (idx, field) in schema.fields().iter().enumerate() { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = create_array( - &mut reader, - field, - &mut variadic_counts, - require_alignment, - skip_validations, - )?; + let child = if skip_validations { + create_array_unchecked( + &mut reader, + field, + &mut variadic_counts, + require_alignment, + )? + } else { + create_array(&mut reader, field, &mut variadic_counts, require_alignment)? + }; arrays.push((proj_idx, child)); } else { reader.skip_field(field, &mut variadic_counts)?; @@ -699,13 +870,11 @@ fn read_record_batch_impl( let mut children = vec![]; // keep track of index as lists require more than one node for field in schema.fields() { - let child = create_array( - &mut reader, - field, - &mut variadic_counts, - require_alignment, - skip_validations, - )?; + let child = if skip_validations { + create_array_unchecked(&mut reader, field, &mut variadic_counts, require_alignment)? + } else { + create_array(&mut reader, field, &mut variadic_counts, require_alignment)? + }; children.push(child); } assert!(variadic_counts.is_empty()); @@ -713,7 +882,7 @@ fn read_record_batch_impl( } } -fn read_dictionary_impl( +unsafe fn read_dictionary_impl( buf: &Buffer, batch: crate::DictionaryBatch, schema: &Schema, @@ -878,7 +1047,6 @@ pub struct FileDecoder { version: MetadataVersion, projection: Option>, require_alignment: bool, - skip_validations: bool, } impl FileDecoder { @@ -890,7 +1058,6 @@ impl FileDecoder { dictionaries: Default::default(), projection: None, require_alignment: false, - skip_validations: false, } } @@ -917,19 +1084,6 @@ impl FileDecoder { self } - /// Specifies whether or not to skip validations when creating [`ArrayData`]. - /// This can lead to undefined behavior if the data is not correctly formatted. - /// Set `skip_validations` to true only if you are certain. - /// - /// Notes: - /// * If `skip_validations` is true, `require_alignment` is ignored. - /// * If `skip_validations` is true, it uses [`arrow_data::ArrayDataBuilder::build_unchecked`] to - /// construct [`arrow_data::ArrayData`] under the hood. - pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { - self.skip_validations = skip_validations; - self - } - fn read_message<'a>(&self, buf: &'a [u8]) -> Result, ArrowError> { let message = parse_message(buf)?; @@ -942,8 +1096,12 @@ impl FileDecoder { Ok(message) } - /// Read the dictionary with the given block and data buffer - pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> { + unsafe fn read_dictionary_internal( + &mut self, + block: &Block, + buf: &Buffer, + skip_validations: bool, + ) -> Result<(), ArrowError> { let message = self.read_message(buf)?; match message.header_type() { crate::MessageHeader::DictionaryBatch => { @@ -955,7 +1113,7 @@ impl FileDecoder { &mut self.dictionaries, &message.version(), self.require_alignment, - self.skip_validations, + skip_validations, ) } t => Err(ArrowError::ParseError(format!( @@ -964,11 +1122,30 @@ impl FileDecoder { } } - /// Read the RecordBatch with the given block and data buffer - pub fn read_record_batch( + /// Read the dictionary with the given block and data buffer + pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> Result<(), ArrowError> { + unsafe { self.read_dictionary_internal(block, buf, false) } + } + + /// Read the dictionary with the given block and data buffer + /// + /// # Safety: + /// Skip validations to create an `ArrayData` without performing the usual validations. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// Use this function only if you are certain and trust the data source. + pub unsafe fn read_dictionary_unchecked( + &mut self, + block: &Block, + buf: &Buffer, + ) -> Result<(), ArrowError> { + self.read_dictionary_internal(block, buf, true) + } + + unsafe fn read_record_batch_internal( &self, block: &Block, buf: &Buffer, + skip_validations: bool, ) -> Result, ArrowError> { let message = self.read_message(buf)?; match message.header_type() { @@ -988,7 +1165,7 @@ impl FileDecoder { self.projection.as_deref(), &message.version(), self.require_alignment, - self.skip_validations, + skip_validations, ) .map(Some) } @@ -998,6 +1175,29 @@ impl FileDecoder { ))), } } + + /// Read the RecordBatch with the given block and data buffer + pub fn read_record_batch( + &self, + block: &Block, + buf: &Buffer, + ) -> Result, ArrowError> { + unsafe { self.read_record_batch_internal(block, buf, false) } + } + + /// Same as `read_record_batch`, but skips validations. + /// + /// # Safety: + /// Skip validations to create an `ArrayData` without performing the usual validations. + /// This can lead to undefined behavior if the data is not correctly formatted. + /// Use this function only if you are certain and trust the data source. + pub unsafe fn read_record_batch_unchecked( + &self, + block: &Block, + buf: &Buffer, + ) -> Result, ArrowError> { + self.read_record_batch_internal(block, buf, true) + } } /// Build an Arrow [`FileReader`] with custom options. @@ -1009,8 +1209,6 @@ pub struct FileReaderBuilder { max_footer_fb_tables: usize, /// Passed through to construct [`VerifierOptions`] max_footer_fb_depth: usize, - /// Skip validations when creating [`ArrayData`] - skip_validations: bool, } impl Default for FileReaderBuilder { @@ -1020,7 +1218,6 @@ impl Default for FileReaderBuilder { max_footer_fb_tables: verifier_options.max_tables, max_footer_fb_depth: verifier_options.max_depth, projection: None, - skip_validations: false, } } } @@ -1039,14 +1236,6 @@ impl FileReaderBuilder { self } - /// Skip validations when creating underlying [`ArrayData`]. - /// This can lead to undefined behavior if the data is not correctly formatted. - /// Set `skip_validations` to true only if you are certain. - pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { - self.skip_validations = skip_validations; - self - } - /// Flatbuffers option for parsing the footer. Controls the max number of fields and /// metadata key-value pairs that can be parsed from the schema of the footer. /// @@ -1081,8 +1270,11 @@ impl FileReaderBuilder { self } - /// Build [`FileReader`] with given reader. - pub fn build(self, mut reader: R) -> Result, ArrowError> { + unsafe fn build_internal( + self, + mut reader: R, + skip_validations: bool, + ) -> Result, ArrowError> { // Space for ARROW_MAGIC (6 bytes) and length (4 bytes) let mut buffer = [0; 10]; reader.seek(SeekFrom::End(-10))?; @@ -1129,8 +1321,7 @@ impl FileReaderBuilder { } } - let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()) - .with_skip_validations(self.skip_validations); + let mut decoder = FileDecoder::new(Arc::new(schema), footer.version()); if let Some(projection) = self.projection { decoder = decoder.with_projection(projection) } @@ -1139,7 +1330,11 @@ impl FileReaderBuilder { if let Some(dictionaries) = footer.dictionaries() { for block in dictionaries { let buf = read_block(&mut reader, block)?; - decoder.read_dictionary(block, &buf)?; + if skip_validations { + decoder.read_dictionary_unchecked(block, &buf)?; + } else { + decoder.read_dictionary(block, &buf)?; + } } } @@ -1152,6 +1347,20 @@ impl FileReaderBuilder { custom_metadata, }) } + + /// Build [`FileReader`] with given reader. + pub fn build(self, reader: R) -> Result, ArrowError> { + unsafe { self.build_internal(reader, false) } + } + + /// Build [`FileReader`] with given reader without validations at the build up stage. + /// Upon reading the data, validations will be performed if not specified otherwise. + pub unsafe fn build_unvalidated( + self, + reader: R, + ) -> Result, ArrowError> { + self.build_internal(reader, true) + } } /// Arrow File reader @@ -1227,7 +1436,6 @@ impl FileReader { ) -> Result { let builder = FileReaderBuilder { projection, - skip_validations: true, ..Default::default() }; builder.build(reader) @@ -1272,6 +1480,20 @@ impl FileReader { self.decoder.read_record_batch(block, &buffer) } + unsafe fn maybe_next_unvalidated(&mut self) -> Result, ArrowError> { + let block = &self.blocks[self.current_block]; + self.current_block += 1; + + // read length + let buffer = read_block(&mut self.reader, block)?; + self.decoder.read_record_batch_unchecked(block, &buffer) + } + + /// Returns an iterator that uses the unsafe version of maybe_next_unvalidated. + pub unsafe fn into_unvalidated_iterator(self) -> UnvalidatedFileReader { + UnvalidatedFileReader { reader: self } + } + /// Gets a reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. @@ -1306,6 +1528,30 @@ impl RecordBatchReader for FileReader { } } +/// An iterator over the record batches (without validation) in an Arrow file +pub struct UnvalidatedFileReader { + reader: FileReader, +} + +impl Iterator for UnvalidatedFileReader { + type Item = Result; + + fn next(&mut self) -> Option { + if self.reader.current_block < self.reader.total_blocks { + // Use the unsafe `maybe_next_unvalidated` function + unsafe { + match self.reader.maybe_next_unvalidated() { + Ok(Some(batch)) => Some(Ok(batch)), + Ok(None) => None, // End of the file + Err(e) => Some(Err(e)), + } + } + } else { + None + } + } +} + /// Arrow Stream reader pub struct StreamReader { /// Stream reader @@ -1326,10 +1572,6 @@ pub struct StreamReader { /// Optional projection projection: Option<(Vec, Schema)>, - - /// Specifies whether or not skip validations when creating underlying [`ArrayData`]. - /// This can lead to undefined behavior if the data is not correctly formatted. - skip_validations: bool, } impl fmt::Debug for StreamReader { @@ -1409,7 +1651,6 @@ impl StreamReader { finished: false, dictionaries_by_id, projection, - skip_validations: false, }) } @@ -1432,17 +1673,10 @@ impl StreamReader { self.finished } - /// Specifies whether or not skip validations when creating underlying [`ArrayData`]. - /// This can lead to undefined behavior if the data is not correctly formatted. - /// - /// Notes: - /// * If `skip_validations` is true, `require_alignment` is ignored. - pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { - self.skip_validations = skip_validations; - self - } - - fn maybe_next(&mut self) -> Result, ArrowError> { + unsafe fn maybe_next_internal( + &mut self, + skip_validations: bool, + ) -> Result, ArrowError> { if self.finished { return Ok(None); } @@ -1507,7 +1741,7 @@ impl StreamReader { self.projection.as_ref().map(|x| x.0.as_ref()), &message.version(), false, - self.skip_validations, + skip_validations, ) .map(Some) } @@ -1528,7 +1762,7 @@ impl StreamReader { &mut self.dictionaries_by_id, &message.version(), false, - self.skip_validations, + skip_validations, )?; // read the next message until we encounter a RecordBatch @@ -1541,6 +1775,14 @@ impl StreamReader { } } + fn maybe_next(&mut self) -> Result, ArrowError> { + unsafe { self.maybe_next_internal(false) } + } + + unsafe fn maybe_next_unvalidated(&mut self) -> Result, ArrowError> { + self.maybe_next_internal(true) + } + /// Gets a reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. @@ -1554,6 +1796,11 @@ impl StreamReader { pub fn get_mut(&mut self) -> &mut R { &mut self.reader } + + /// Returns an iterator that uses the unsafe version of maybe_next_unvalidated. + pub unsafe fn into_unvalidated_iterator(self) -> UnvalidatedStreamReader { + UnvalidatedStreamReader { reader: self } + } } impl Iterator for StreamReader { @@ -1570,6 +1817,25 @@ impl RecordBatchReader for StreamReader { } } +/// An iterator over the record batches (without validation) in an Arrow stream +pub struct UnvalidatedStreamReader { + reader: StreamReader, +} + +impl Iterator for UnvalidatedStreamReader { + type Item = Result; + + fn next(&mut self) -> Option { + unsafe { + match self.reader.maybe_next_unvalidated() { + Ok(Some(batch)) => Some(Ok(batch)), + Ok(None) => None, // End of the file + Err(e) => Some(Err(e)), + } + } + } +} + #[cfg(test)] mod tests { use crate::writer::{unslice_run_array, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; @@ -2351,17 +2617,19 @@ mod tests { assert_ne!(b.as_ptr().align_offset(8), 0); let ipc_batch = message.header_as_record_batch().unwrap(); - let roundtrip = read_record_batch_impl( - &b, - ipc_batch, - batch.schema(), - &Default::default(), - None, - &message.version(), - false, - false, - ) - .unwrap(); + let roundtrip = unsafe { + read_record_batch_impl( + &b, + ipc_batch, + batch.schema(), + &Default::default(), + None, + &message.version(), + false, + false, + ) + .unwrap() + }; assert_eq!(batch, roundtrip); } @@ -2390,16 +2658,18 @@ mod tests { assert_ne!(b.as_ptr().align_offset(8), 0); let ipc_batch = message.header_as_record_batch().unwrap(); - let result = read_record_batch_impl( - &b, - ipc_batch, - batch.schema(), - &Default::default(), - None, - &message.version(), - true, - false, - ); + let result = unsafe { + read_record_batch_impl( + &b, + ipc_batch, + batch.schema(), + &Default::default(), + None, + &message.version(), + true, + false, + ) + }; let error = result.unwrap_err(); assert_eq!( diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 127d346cf8c5..6ee86b720880 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -42,8 +42,6 @@ pub struct StreamDecoder { buf: MutableBuffer, /// Whether or not array data in input buffers are required to be aligned require_alignment: bool, - /// Whether or not to skip validation for underlying array creations - skip_validations: bool, } #[derive(Debug)] @@ -104,46 +102,11 @@ impl StreamDecoder { self } - /// Specifies whether or not to skip validations when creating [`ArrayData`]. - /// This can lead to undefined behavior if the data is not correctly formatted. - /// Set `skip_validations` to true only if you are certain. - /// - /// Notes: - /// * If `skip_validations` is true, `require_alignment` is ignored. - /// * If `skip_validations` is true, it uses [`arrow_data::ArrayDataBuilder::build_unchecked`] to - /// construct [`arrow_data::ArrayData`] under the hood. - pub fn with_skip_validations(mut self, skip_validations: bool) -> Self { - self.skip_validations = skip_validations; - self - } - - /// Try to read the next [`RecordBatch`] from the provided [`Buffer`] - /// - /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes. - /// - /// The push-based interface facilitates integration with sources that yield arbitrarily - /// delimited bytes ranges, such as a chunked byte stream received from object storage - /// - /// ``` - /// # use arrow_array::RecordBatch; - /// # use arrow_buffer::Buffer; - /// # use arrow_ipc::reader::StreamDecoder; - /// # use arrow_schema::ArrowError; - /// # - /// fn print_stream(src: impl Iterator) -> Result<(), ArrowError> { - /// let mut decoder = StreamDecoder::new(); - /// for mut x in src { - /// while !x.is_empty() { - /// if let Some(x) = decoder.decode(&mut x)? { - /// println!("{x:?}"); - /// } - /// } - /// } - /// decoder.finish().unwrap(); - /// Ok(()) - /// } - /// ``` - pub fn decode(&mut self, buffer: &mut Buffer) -> Result, ArrowError> { + unsafe fn decode_internal( + &mut self, + buffer: &mut Buffer, + skip_validations: bool, + ) -> Result, ArrowError> { while !buffer.is_empty() { match &mut self.state { DecoderState::Header { @@ -226,16 +189,18 @@ impl StreamDecoder { let schema = self.schema.clone().ok_or_else(|| { ArrowError::IpcError("Missing schema".to_string()) })?; - let batch = read_record_batch_impl( - &body, - batch, - schema, - &self.dictionaries, - None, - &version, - self.require_alignment, - self.skip_validations, - )?; + let batch = unsafe { + read_record_batch_impl( + &body, + batch, + schema, + &self.dictionaries, + None, + &version, + self.require_alignment, + skip_validations, + )? + }; self.state = DecoderState::default(); return Ok(Some(batch)); } @@ -244,15 +209,17 @@ impl StreamDecoder { let schema = self.schema.as_deref().ok_or_else(|| { ArrowError::IpcError("Missing schema".to_string()) })?; - read_dictionary_impl( - &body, - dictionary, - schema, - &mut self.dictionaries, - &version, - self.require_alignment, - self.skip_validations, - )?; + unsafe { + read_dictionary_impl( + &body, + dictionary, + schema, + &mut self.dictionaries, + &version, + self.require_alignment, + skip_validations, + )?; + } self.state = DecoderState::default(); } MessageHeader::NONE => { @@ -273,6 +240,48 @@ impl StreamDecoder { Ok(None) } + /// Try to read the next [`RecordBatch`] from the provided [`Buffer`] + /// + /// [`Buffer::advance`] will be called on `buffer` for any consumed bytes. + /// + /// The push-based interface facilitates integration with sources that yield arbitrarily + /// delimited bytes ranges, such as a chunked byte stream received from object storage + /// + /// ``` + /// # use arrow_array::RecordBatch; + /// # use arrow_buffer::Buffer; + /// # use arrow_ipc::reader::StreamDecoder; + /// # use arrow_schema::ArrowError; + /// # + /// fn print_stream(src: impl Iterator) -> Result<(), ArrowError> { + /// let mut decoder = StreamDecoder::new(); + /// for mut x in src { + /// while !x.is_empty() { + /// if let Some(x) = decoder.decode(&mut x)? { + /// println!("{x:?}"); + /// } + /// } + /// } + /// decoder.finish().unwrap(); + /// Ok(()) + /// } + /// ``` + pub fn decode(&mut self, buffer: &mut Buffer) -> Result, ArrowError> { + unsafe { self.decode_internal(buffer, false) } + } + + /// Try to read the next [`RecordBatch`] from the provided [`Buffer`] without validating the data + /// This is useful when the data is known to be valid and the validation can be skipped + /// + /// # Safety: + /// This method is unsafe because it does not validate the data + pub unsafe fn decode_unvalidated( + &mut self, + buffer: &mut Buffer, + ) -> Result, ArrowError> { + unsafe { self.decode_internal(buffer, true) } + } + /// Signal the end of stream /// /// Returns an error if any partial data remains in the stream From fd775757ebe1c157ca9e9bea4a2f7fb262664f28 Mon Sep 17 00:00:00 2001 From: Yibo Yan Date: Fri, 10 Jan 2025 17:48:00 -0800 Subject: [PATCH 3/3] chore: docs fix Co-authored-by: Andrew Lamb --- arrow-ipc/src/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 689ed798ee9a..4705db7759ca 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -553,7 +553,7 @@ unsafe fn create_dictionary_array_internal( } /// Reads the correct number of buffers based on list type and null_count, and creates a -/// list array ref +/// dictionary array ref /// /// Safety: /// `skip_validations` allows the creation of an `ArrayData` without performing the