Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ArrayAccessor, Iterator, Extend and benchmarks for RunArray #3603

Merged
merged 6 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 225 additions & 1 deletion arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use arrow_schema::{ArrowError, DataType, Field};
use crate::{
builder::StringRunBuilder,
make_array,
run_iterator::RunArrayIter,
types::{Int16Type, Int32Type, Int64Type, RunEndIndexType},
Array, ArrayRef, PrimitiveArray,
Array, ArrayAccessor, ArrayRef, PrimitiveArray,
};

///
Expand Down Expand Up @@ -121,6 +122,27 @@ impl<R: RunEndIndexType> RunArray<R> {
pub fn values(&self) -> &ArrayRef {
&self.values
}

/// Downcast this dictionary to a [`TypedRunArray`]
askoa marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
/// use arrow_array::{Array, ArrayAccessor, RunArray, StringArray, types::Int32Type};
///
/// let orig = [Some("a"), Some("b"), None];
/// let run_array = RunArray::<Int32Type>::from_iter(orig);
/// let typed = run_array.downcast_ref::<StringArray>().unwrap();
/// assert_eq!(typed.value(0), "a");
/// assert_eq!(typed.value(1), "b");
/// assert!(typed.values().is_null(2));
/// ```
///
pub fn downcast_ref<V: 'static>(&self) -> Option<TypedRunArray<'_, R, V>> {
askoa marked this conversation as resolved.
Show resolved Hide resolved
let values = self.values.as_any().downcast_ref()?;
Some(TypedRunArray {
run_array: self,
values,
})
}
}

impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
Expand Down Expand Up @@ -274,15 +296,191 @@ pub type Int32RunArray = RunArray<Int32Type>;
/// ```
pub type Int64RunArray = RunArray<Int64Type>;

/// A strongly-typed wrapper around a [`RunArray`] that implements [`ArrayAccessor`]
/// and [`IntoIterator`] allowing fast access to its elements
///
/// ```
/// use arrow_array::{RunArray, StringArray, types::Int32Type};
///
/// let orig = ["a", "b", "a", "b"];
/// let ree_array = RunArray::<Int32Type>::from_iter(orig);
///
/// // `TypedRunArray` allows you to access the values directly
/// let typed = ree_array.downcast_ref::<StringArray>().unwrap();
///
/// for (maybe_val, orig) in typed.into_iter().zip(orig) {
/// assert_eq!(maybe_val.unwrap(), orig)
/// }
/// ```
pub struct TypedRunArray<'a, R: RunEndIndexType, V> {
/// The run array
run_array: &'a RunArray<R>,

/// The values of the run_array
values: &'a V,
}

// Manually implement `Clone` to avoid `V: Clone` type constraint
impl<'a, R: RunEndIndexType, V> Clone for TypedRunArray<'a, R, V> {
fn clone(&self) -> Self {
Self {
run_array: self.run_array,
values: self.values,
}
}
}

impl<'a, R: RunEndIndexType, V> Copy for TypedRunArray<'a, R, V> {}

impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "TypedRunArray({:?})", self.run_array)
}
}

impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
/// Returns the run_ends of this [`TypedRunArray`]
pub fn run_ends(&self) -> &'a PrimitiveArray<R> {
self.run_array.run_ends()
}

/// Returns the values of this [`TypedRunArray`]
pub fn values(&self) -> &'a V {
self.values
}

/// Returns index to the physcial array for the given index to the logical array.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
askoa marked this conversation as resolved.
Show resolved Hide resolved
if logical_index >= self.run_array.len() {
return None;
}
let mut st: usize = 0;
askoa marked this conversation as resolved.
Show resolved Hide resolved
let mut en: usize = self.run_ends().len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
< unsafe {
// Safety:
// The value of mid will always be between 1 and len - 1,
// where len is length of run ends array.
// This is based on the fact that `st` starts with 0 and
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends().value_unchecked(mid - 1).as_usize()
}
{
en = mid
} else {
st = mid
}
}
Some(st)
}
}

impl<'a, R: RunEndIndexType, V: Sync> Array for TypedRunArray<'a, R, V> {
fn as_any(&self) -> &dyn Any {
self.run_array
}

fn data(&self) -> &ArrayData {
&self.run_array.data
}

fn into_data(self) -> ArrayData {
self.run_array.into_data()
}
}

// Array accessor converts the index of logical array to the index of the physical array
// using binary search. The time complexity is O(log N) where N is number of runs.
impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
type Item = <&'a V as ArrayAccessor>::Item;

fn value(&self, logical_index: usize) -> Self::Item {
assert!(
logical_index < self.len(),
"Trying to access an element at index {} from a TypedRunArray of length {}",
logical_index,
self.len()
);
unsafe { self.value_unchecked(logical_index) }
}

unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
let physical_index = self.get_physical_index(logical_index).unwrap();
self.values().value_unchecked(physical_index)
}
}

impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
type Item = Option<<&'a V as ArrayAccessor>::Item>;
type IntoIter = RunArrayIter<'a, R, V>;

fn into_iter(self) -> Self::IntoIter {
RunArrayIter::new(self)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;

use super::*;
use crate::builder::PrimitiveRunBuilder;
use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
use crate::{Array, Int16Array, Int32Array, StringArray};

fn build_input_array(approx_size: usize) -> Vec<Option<i32>> {
let mut seed: Vec<Option<i32>> = vec![
askoa marked this conversation as resolved.
Show resolved Hide resolved
None,
None,
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
];
let mut ix = 0;
let mut result: Vec<Option<i32>> = Vec::with_capacity(approx_size);
let mut rng = thread_rng();
while result.len() < approx_size {
if ix == 0 {
seed.shuffle(&mut rng);
}
let num = rand::thread_rng().gen_range(0..8);
for _ in 0..num {
result.push(seed[ix]);
}
ix += 1;
if ix == 8 {
ix = 0
}
}
println!("Size of input array: {}", result.len());
result
}

#[test]
fn test_run_array() {
// Construct a value array
Expand Down Expand Up @@ -504,4 +702,30 @@ mod tests {
let a = RunArray::<Int32Type>::from_iter(["32"]);
let _ = RunArray::<Int64Type>::from(a.into_data());
}

#[test]
fn test_ree_array_accessor() {
let input_array = build_input_array(256);

// Encode the input_array to ree_array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.clone().into_iter());
askoa marked this conversation as resolved.
Show resolved Hide resolved
let run_array = builder.finish();
let typed = run_array
.downcast_ref::<PrimitiveArray<Int32Type>>()
.unwrap();

for (i, inp_val) in input_array.iter().enumerate() {
if let Some(val) = inp_val {
let actual = typed.value(i);
assert_eq!(*val, actual)
} else {
// TODO: Check if the value in logical index is null.
// It seems, currently, there is no way to check nullability of logical index.
// Should `array.is_null` be overwritten to return nullability
// of logical index?
};
}
}
}
53 changes: 42 additions & 11 deletions arrow-array/src/builder/generic_byte_run_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ use arrow_buffer::ArrowNativeType;
///
/// let mut builder =
/// GenericByteRunBuilder::<Int16Type, BinaryType>::new();
/// builder.append_value(b"abc");
/// builder.append_value(b"abc");
/// builder.append_null();
/// builder.extend([Some(b"abc"), Some(b"abc"), None, Some(b"def")].into_iter());
/// builder.append_value(b"def");
/// builder.append_null();
/// let array = builder.finish();
///
/// assert_eq!(
/// array.run_ends(),
/// &Int16Array::from(vec![Some(2), Some(3), Some(4)])
/// &Int16Array::from(vec![Some(2), Some(3), Some(5), Some(6)])
/// );
///
/// let av = array.values();
///
/// assert!(!av.is_null(0));
/// assert!(av.is_null(1));
/// assert!(!av.is_null(2));
/// assert!(av.is_null(3));
///
/// // Values are polymorphic and so require a downcast.
/// let ava: &BinaryArray = as_generic_binary_array(av.as_ref());
Expand Down Expand Up @@ -299,6 +299,19 @@ where
}
}

impl<R, V, S> Extend<Option<S>> for GenericByteRunBuilder<R, V>
where
R: RunEndIndexType,
V: ByteArrayType,
S: AsRef<V::Native>,
{
fn extend<T: IntoIterator<Item = Option<S>>>(&mut self, iter: T) {
for elem in iter {
self.append_option(elem);
}
}
}

/// Array builder for [`RunArray`] that encodes strings ([`Utf8Type`]).
///
/// ```
Expand All @@ -315,9 +328,7 @@ where
/// // The builder builds the dictionary value by value
/// builder.append_value("abc");
/// builder.append_null();
/// builder.append_value("def");
/// builder.append_value("def");
/// builder.append_value("abc");
/// builder.extend([Some("def"), Some("def"), Some("abc")]);
/// let array = builder.finish();
///
/// assert_eq!(
Expand Down Expand Up @@ -356,9 +367,7 @@ pub type LargeStringRunBuilder<K> = GenericByteRunBuilder<K, LargeUtf8Type>;
/// // The builder builds the dictionary value by value
/// builder.append_value(b"abc");
/// builder.append_null();
/// builder.append_value(b"def");
/// builder.append_value(b"def");
/// builder.append_value(b"abc");
/// builder.extend([Some(b"def"), Some(b"def"), Some(b"abc")]);
/// let array = builder.finish();
///
/// assert_eq!(
Expand Down Expand Up @@ -387,7 +396,9 @@ mod tests {
use super::*;

use crate::array::Array;
use crate::types::Int16Type;
use crate::cast::as_primitive_array;
use crate::cast::as_string_array;
use crate::types::{Int16Type, Int32Type};
use crate::GenericByteArray;
use crate::Int16Array;
use crate::Int16RunArray;
Expand Down Expand Up @@ -516,4 +527,24 @@ mod tests {
fn test_binary_run_buider_finish_cloned() {
test_bytes_run_buider_finish_cloned::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
}

#[test]
fn test_extend() {
let mut builder = StringRunBuilder::<Int32Type>::new();
builder.extend(["a", "a", "a", "", "", "b", "b"].into_iter().map(Some));
builder.extend(["b", "cupcakes", "cupcakes"].into_iter().map(Some));
let array = builder.finish();

assert_eq!(array.len(), 10);
assert_eq!(
as_primitive_array::<Int32Type>(array.run_ends()).values(),
&[3, 5, 8, 10]
);

let str_array = as_string_array(array.values().as_ref());
assert_eq!(str_array.value(0), "a");
assert_eq!(str_array.value(1), "");
assert_eq!(str_array.value(2), "b");
assert_eq!(str_array.value(3), "cupcakes");
}
}
Loading